DEV Community

Chidozie C. Okafor
Chidozie C. Okafor

Posted on • Originally published at doziestar.Medium on

Concurrency In Python Made Simple:

Concurrency can be thought of as multiple things happening at the same time.

Imagine you are hosting a dinner party and you need to cook multiple dishes at the same time to serve all your guests. One way to do this is to cook each dish one after the other, but this would take a lot of time and your guests would have to wait a long time to eat.

A better way to handle this is to use multiple burners on your stove and cook multiple dishes at the same time. Each burner represents a different task, and you can use them to cook different dishes simultaneously. This way, you can finish cooking all the dishes in a shorter amount of time, and your guests can enjoy their meal together.

The burners represent concurrent tasks and the dishes represent different processes or threads that are executed simultaneously. This allows you to complete multiple tasks at the same time, improving efficiency and reducing the overall time it takes to finish all the tasks.

In programming, concurrency is the concept of multiple threads or processes running at the same time, independently of one another. This allows for the efficient use of resources, such as CPU and memory, and can result in a more responsive and dynamic application.

There are two main types of concurrency:

  1. Parallelism: refers to the simultaneous execution of multiple tasks using multiple processors or cores. Parallelism enables the system to complete a large task in a shorter amount of time by dividing the task into smaller, independent subtasks that can be executed simultaneously.
  2. Asynchrony: refers to the ability of a system to handle multiple tasks in a non-blocking way. This means that the system can continue to work on other tasks while it is waiting for a specific task to complete.

Concurrency can be achieved in many ways, such as using threads, processes, and event-driven programming. However, it can also introduce complexity and the potential for race conditions and other synchronization issues, if not handled properly.

The concept of concurrency has been present in Python since the early days of the language. However, the specific mechanisms for achieving concurrency have evolved over time.

In the early versions of Python, the thread module was introduced, which provided a basic threading API for creating and managing threads. However, the Global Interpreter Lock (GIL) in the CPython implementation of Python, which is the most widely used implementation, limited the performance benefits of threading by preventing multiple threads from executing Python bytecode simultaneously.

In Python 2.4, the concurrent.futures module was introduced, which provided a higher-level API for working with threads and processes. This module made it easier to write concurrent code, but it still suffered from the limitations of the GIL.

In Python 3.2, the concurrent.futures module was improved and the concurrent.futures.ProcessPoolExecutor class was added, which allowed for true parallelism by using multiple processes instead of threads.

Python 3.4 introduced the asyncio library that allows you to write concurrent code using the async/await syntax. This library provides a way for writing asynchronous code that can run concurrently without the need for threads or processes.

In recent versions of Python (3.8+), the concurrent.futures.ThreadPoolExecutor class was updated to support the use of native threading primitives and the introduction of the threading.Barrier class, which can be used to synchronize the execution of threads.

Overall, the history of concurrency in Python has seen a gradual evolution of the available mechanisms for achieving concurrency, with an emphasis on providing higher-level abstractions and making it easier to write concurrent code.

Threading

Threading in Python is a way to achieve concurrency by creating and managing multiple threads within a single process. Each thread has its own execution context, which includes its own call stack and program counter, but it shares the same memory space as the other threads in the process. This allows for efficient use of resources, such as CPU and memory, and can result in a more responsive and dynamic application.

The threading module in Python provides a basic threading API for creating and managing threads. The main classes in this module are:

  • Thread: This class represents a single thread of execution. You can create a new thread by instantiating this class and passing a callable object (such as a function or a method) as the target. Once the thread is started, the target function will be executed in the new thread.
  • Lock: This class provides a way to synchronize access to shared resources between threads. A lock is in one of two states, "locked" or "unlocked". Only one thread can hold a lock at a time.
  • RLock: This class provides a way to synchronize access to shared resources between threads. A RLock is in one of two states, "locked" or "unlocked". A RLock can be acquired multiple times by the same thread, and it will only be unlocked when the same thread has released it as many times as it acquired it.
  • Semaphore: This class provides a way to synchronize access to shared resources between threads. A semaphore is a value that is non-negative and shared between threads. The value is decremented by one each time a thread acquires the semaphore, and incremented by one each time a thread releases the semaphore.
  • Barrier: This class provides a way to synchronize the execution of threads. A barrier is a synchronization object that allows multiple threads to wait for each other to reach a certain point in the execution before continuing.

In addition to the threading module, Python also provides the concurrent.futures module, which provides a higher-level API for working with threads and processes. This module includes the following classes:

  • ThreadPoolExecutor: This class is used to execute callable objects in a thread pool. The thread pool is created with a fixed number of worker threads, and each worker thread is responsible for executing a single task at a time.
  • ProcessPoolExecutor: This class is used to execute callable objects in a process pool. The process pool is created with a fixed number of worker processes, and each worker process is responsible for executing a single task at a time.

let’s look at some example

import threading
import time

def worker():
    """thread worker function"""
    print('Worker')
    time.sleep(5)
    print('Worker finished')

# create a new thread
t = threading.Thread(target=worker)

# start the thread
t.start()

# main thread continues execution
print('Main thread')

# wait for the worker thread to complete
t.join()

# main thread continues execution after worker thread completes
print('Main thread finished')
Enter fullscreen mode Exit fullscreen mode

In the above example, we define a worker function that simulates a long-running task by sleeping for 5 seconds. We then create a new thread and assign the worker function as its target. The start() method is then called on the thread to start its execution. The main thread continues execution and prints 'Main thread'. The join() method is called on the thread to wait for the worker thread to complete. Once the worker thread completes, the main thread continues execution and prints 'Main thread finished'.

Note that the join() method is used to wait for the worker thread to complete before the main thread continues execution. It is also important to note that the time.sleep(5) is used to simulate a long-running task and make the threading example more clear.

In addition to the Thread class, the threading module also provides other classes like Lock, RLock, Semaphore, and Barrier that can be used to synchronize the execution of threads and ensure that shared resources are accessed in a safe and controlled manner.

For example:

import threading

counter = 0

def increment():
    global counter
    lock.acquire()
    counter += 1
    lock.release()

lock = threading.Lock()

t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()

print(counter)
Enter fullscreen mode Exit fullscreen mode

This example uses a Lock object to synchronize access to the shared variable counter. The acquire() method is used to acquire the lock and the release() method is used to release it. This ensures that only one thread can access the shared variable at a time, preventing race conditions and other synchronization issues.

It is important to note that threading can be tricky, so it is important to be familiar with the threading module and the Global Interpreter Lock (GIL) in Python, which can affect the performance of multithreaded programs.

More advance example

import threading
import queue
import time

class DataFetcherThread(threading.Thread):
    """Thread class for fetching data from a remote API"""
    def __init__ (self, queue, api_url):
        threading.Thread. __init__ (self)
        self.queue = queue
        self.api_url = api_url

    def run(self):
        while True:
            # fetch data from the API
            data = fetch_data_from_api(self.api_url)

            # put the data in the queue
            self.queue.put(data)

            # sleep for a short period of time
            time.sleep(5)

class DataProcessorThread(threading.Thread):
    """Thread class for processing data"""
    def __init__ (self, queue):
        threading.Thread. __init__ (self)
        self.queue = queue

    def run(self):
        while True:
            # get data from the queue
            data = self.queue.get()

            # process the data
            processed_data = process_data(data)

            # do something with the processed data
            save_to_database(processed_data)

            # signal that the data has been processed
            self.queue.task_done()

def fetch_data_from_api(api_url):
    """Function to fetch data from a remote API"""
    # code to fetch data from the API
    pass

def process_data(data):
    """Function to process data"""
    # code to process data
    pass

def save_to_database(processed_data):
    """Function to save data to a database"""
    # code to save data to the database
    pass

# create a queue to hold the data
data_queue = queue.Queue()

# create the fetcher and processor threads
fetcher = DataFetcherThread(data_queue, 'https://example.com/api')
processor = DataProcessorThread(data_queue)

# start the threads
fetcher.start()
processor.start()

# wait for the threads to complete
data_queue.join()
Enter fullscreen mode Exit fullscreen mode

In the above example, we have two thread classes: DataFetcherThread and DataProcessorThread. The DataFetcherThread class is responsible for fetching data from a remote API and putting it in a queue, while the DataProcessorThread class is responsible for getting data from the queue, processing it, and saving it to a database.

The DataFetcherThread class continuously fetches data from the API and puts it in the queue, while the DataProcessorThread class continuously gets data from the queue, processes it, and saves it to the database.

We use a queue.Queue object to hold the data, which ensures that the data is accessed in a thread-safe manner, preventing race conditions and other synchronization issues.

The fetch_data_from_api, process_data and save_to_database are the functions that simulate the real-world scenario where the data is fetched from an API, processed, and saved to a database.

Let’s use RLock, Semaphore, and Barrier to achieve concurrency in Python:

import threading
import time

counter = 0

def increment():
    global counter
    lock.acquire()
    counter += 1
    lock.release()

# Create a reentrant lock
lock = threading.RLock()

# Create a semaphore with a maximum capacity of 3
semaphore = threading.Semaphore(3)

# Create a barrier with a capacity of 4
barrier = threading.Barrier(4)

def worker1():
    """Thread worker function 1"""
    global counter
    semaphore.acquire()
    increment()
    print('Worker 1:', counter)
    time.sleep(1)
    semaphore.release()
    barrier.wait()

def worker2():
    """Thread worker function 2"""
    global counter
    semaphore.acquire()
    increment()
    print('Worker 2:', counter)
    time.sleep(1)
    semaphore.release()
    barrier.wait()

def worker3():
    """Thread worker function 3"""
    global counter
    semaphore.acquire()
    increment()
    print('Worker 3:', counter)
    time.sleep(1)
    semaphore.release()
    barrier.wait()

def worker4():
    """Thread worker function 4"""
    global counter
    semaphore.acquire()
    increment()
    print('Worker 4:', counter)
    time.sleep(1)
    semaphore.release()
    barrier.wait()

# create 4 threads
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t3 = threading.Thread(target=worker3)
t4 = threading.Thread(target=worker4)

# start the threads
t1.start()
t2.start()
t3.start()
t4.start()

# wait for the threads to complete
t1.join()
t2.join()
t3.join()
t4.join()

print('Counter:', counter)
Enter fullscreen mode Exit fullscreen mode

In the example above, we have four thread worker functions worker1, worker2, worker3, and worker4 that increment a global variable counter and print its value. The access to the global variable is protected by a reentrant lock lock which is an instance of RLock class.

This example illustrates how RLock, Semaphore, and Barrier can be used to achieve concurrency and synchronize the execution of threads in a more complex scenario. The RLock class is used to protect the global variable counter from race conditions and other synchronization issues, while the Semaphore class is used to control the maximum number of threads that can access the global variable at the same time. The Barrier class is used to synchronize the execution of the threads and ensure that they all reach a certain point in the execution before proceeding.

It is important to note that in this example, the use of RLock ensures that a thread can acquire the lock multiple times, and it will only be unlocked when the same thread has released it as many times as it acquired it. This is useful in cases where a thread needs to acquire the lock multiple times, for example, if it needs to access the protected resource multiple times in the same function.

The use of Semaphore allows you to control the number of threads that can access the protected resource at the same time, making sure that the protected resource is not overloaded by too many threads.

The use of Barrier allows you to synchronize the execution of multiple threads, making sure that the threads are working together and not interfering with each other.

Overall, using RLock, Semaphore, and Barrier in combination, you can achieve a more sophisticated and fine-grained control over the concurrent execution of threads and access to shared resources in a Python program.

concurrent.futures

concurrent.futures is a module in the Python Standard Library that provides a higher-level API for working with threads and processes. It was introduced in Python 2.4 and improved in later versions. The module provides two main classes for achieving concurrency: ThreadPoolExecutor and ProcessPoolExecutor.

ThreadPoolExecutor is used to execute callable objects in a thread pool, and ProcessPoolExecutor is used to execute callable objects in a process pool. Both classes work similarly, but ProcessPoolExecutor allows for true parallelism by using multiple processes instead of threads, which can be more efficient when running CPU-bound tasks.

Here’s an example of using ThreadPoolExecutor to achieve concurrency in Python:

import concurrent.futures

def long_running_task(n):
    return n*n

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = [executor.submit(long_running_task, i) for i in range(10)]

    for f in concurrent.futures.as_completed(results):
        print(f.result())
Enter fullscreen mode Exit fullscreen mode

In the above example, we define a function long_running_task that simulates a long-running task by performing a simple computation (squaring a number). We then use a ThreadPoolExecutor to submit this function as a task to be executed concurrently with a range of inputs. The submit method returns a Future object that can be used to track the progress of the task and retrieve the result. The as_completed function is used to iterate over the Future objects as they complete and print the results.

It’s worth noting that, the ThreadPoolExecutor creates a pool of worker threads and assigns a task to an available thread from the pool. The pool size can be defined by passing the maximum number of threads as an argument to the constructor. If the number of tasks exceeds the pool size, the tasks are queued until a thread becomes available.

Similarly, here’s an example of using ProcessPoolExecutor to achieve concurrency in Python:

import concurrent.futures

def long_running_task(n):
    return n*n

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [executor.submit(long_running_task, i) for i in range(10)]

    for f in concurrent.futures.as_completed(results):
        print(f.result())
Enter fullscreen mode Exit fullscreen mode

Here, we use a ProcessPoolExecutor instead of ThreadPoolExecutor to submit the long_running_task function to be executed concurrently. The rest of the code remains the same. By using ProcessPoolExecutor we can leverage the true parallelism of multiple processes to run the CPU-bound task, which can be more efficient than using threads.

It’s worth noting that, when using ProcessPoolExecutor, the tasks are executed in separate Python processes, which allows them to run truly in parallel and take full advantage of multiple CPU cores. However, it also means that the tasks cannot share memory and must use inter-process communication (IPC) mechanisms to share data, which can be more complex and less efficient than sharing memory between threads.

One other thing to note is that with ProcessPoolExecutor, the task function and the data passed to it should be picklable, which means that the function and data should be able to be serialized and deserialized.

Overall, concurrent.futures module provides a simple and powerful way to achieve concurrency in Python by using thread pools and process pools. It abstracts away many of the low-level details of working with threads and processes and provides a more convenient API for concurrent execution of callable objects.

Let’s look at more advance examples

In the example below, we have a function long_running_task that simulates a long-running task by sleeping for 1 second. We also have a callback function callback_function that is called when a task is completed.

We use a ProcessPoolExecutor to submit the long_running_task function to be executed concurrently in separate processes, and use the add_done_callback method to register the callback_function to be called when a task is completed.

Then we use a ThreadPoolExecutor in a similar manner, but this time the tasks are executed concurrently in a thread pool, and not separate processes.

In both cases, we can see the process IDs (PIDs) of the main process, the tasks and the result callback function. This allows us to see which process is running which task, and which process is generating the result.

It’s worth noting that, we can also pass the number of processes or threads to the executor constructors, for example, ProcessPoolExecutor(max_workers=4) or ThreadPoolExecutor(max_workers=4) and it will limit the pool size accordingly.

Another advantage of using the callback function is that it allows us to perform additional processing on the results, such as saving the results to a database or sending them to another service without blocking the main process.

import concurrent.futures
import os
import time

def long_running_task(n):
    print(f"Task {n} is running on process {os.getpid()}")
    time.sleep(1)
    return n * n

def callback_function(future):
    result = future.result()
    print(f"Result {result} was generated by process {os.getpid()}")

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [executor.submit(long_running_task, i) for i in range(10)]

    # Add a callback to be called when a future is done
    for result in results:
        result.add_done_callback(callback_function)

print(f"Main process {os.getpid()}")

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = [executor.submit(long_running_task, i) for i in range(10)]

    # Add a callback to be called when a future is done
    for result in results:
        result.add_done_callback(callback_function)

print(f"Main process {os.getpid()}")
Enter fullscreen mode Exit fullscreen mode

asyncio

asyncio is a module in the Python Standard Library that provides a framework for writing concurrent code using the asynchronous programming paradigm. It was introduced in Python 3.4 and improved in later versions.

asyncio uses the concept of coroutines and an event loop to achieve concurrency. Coroutines are special types of functions that can be paused and resumed, allowing multiple tasks to be executed concurrently without the need for threads or processes. The event loop is the central mechanism that schedules and runs the coroutines.

Here’s an example of using asyncio to achieve concurrency in Python:

import asyncio

async def long_running_task(n):
    print(f"Task {n} started")
    await asyncio.sleep(1)
    print(f"Task {n} finished")
    return n * n

async def main():
    tasks = [long_running_task(i) for i in range(10)]
    completed, pending = await asyncio.wait(tasks)
    for task in completed:
        print(task.result())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Enter fullscreen mode Exit fullscreen mode

In the above example, we have an async function long_running_task that simulates a long-running task by using await asyncio.sleep(1). This function is decorated with async keyword that allows it to be paused and resumed.

The main function is also an async function and it creates a list of long_running_task coroutines and uses the asyncio.wait function to schedule them for execution. The wait function returns two sets of coroutines, one for the completed tasks and one for the pending tasks. We then iterate over the completed tasks and print the results using the result() method of the Task object.

The event loop is created using the asyncio.get_event_loop() function, and the main function is run using the loop.run_until_complete() method. This method runs the event loop until all scheduled tasks are completed.

It’s worth noting that the asyncio allows us to write concurrent code that is similar to synchronous code, by using async and await keywords. This makes it much more intuitive and easier to read and understand than code that uses threads or processes.

Here’s another example, this time using asyncio.gather:

import asyncio

async def long_running_task(n):
    print(f"Task {n} started")
    await asyncio.sleep(1)
    print(f"Task {n} finished")
    return n * n

async def main():
    results = await asyncio.gather(*(long_running_task(i) for i in range(10)))
    for result in results:
        print(result)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Enter fullscreen mode Exit fullscreen mode

In the example above, we use the asyncio.gather function to schedule and run the tasks concurrently and it returns the result as a list of completed tasks.

In both examples, asyncio provides a simple and powerful way to achieve concurrency in Python by using coroutines and an event loop, and it allows for a more elegant and readable code than using threads or processes.

More advance examples

import asyncio

async def long_running_task(n):
    print(f"Task {n} started")
    await asyncio.sleep(1)
    print(f"Task {n} finished")
    return n * n

async def main():
    semaphore = asyncio.Semaphore(3)
    tasks = [long_running_task_with_semaphore(semaphore, i) for i in range(10)]
    completed, pending = await asyncio.wait(tasks)
    for task in completed:
        print(task.result())

async def long_running_task_with_semaphore(semaphore, n):
    async with semaphore:
        return await long_running_task(n)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Enter fullscreen mode Exit fullscreen mode

Here, we have a semaphore semaphore that controls the maximum number of tasks that can be executed concurrently. We use the asyncio.Semaphore class to create the semaphore and pass it the maximum capacity of 3.

We then create a new async function long_running_task_with_semaphore which takes the semaphore and the task's input as parameters. This function uses an async with statement to acquire the semaphore before running the long_running_task function and it releases the semaphore automatically after the task is finished.

In the main function, we create a list of long_running_task_with_semaphore coroutines and use the asyncio.wait function to schedule them for execution. The wait function returns two sets of coroutines, one for the completed tasks and one for the pending tasks. We then iterate over the completed tasks and print the results using the result() method of the Task object.

The event loop is created using the asyncio.get_event_loop() function, and the main function is run using the loop.run_until_complete() method. This method runs the event loop until all scheduled tasks are completed.

In the above example, we can see how we can use the Semaphore class of asyncio to control the number of concurrent tasks, by limiting the number of semaphore tokens, in this case, only 3 tasks are allowed to run concurrently. It is a more sophisticated way to control the concurrency and avoid overloading the system.

Overall, all of these methods provide different ways of achieving concurrency in Python, each with its own advantages and disadvantages.

Top comments (0)