DEV Community

Lucas Leal da Costa
Lucas Leal da Costa

Posted on • Updated on

Solving Race Conditions in Python: A Mutex Approach for Efficiency

Introduction

When working with concurrent programs, it is crucial to prevent race conditions. In this article, we will demonstrate the implementation of a scalable and efficient mutex to facilitate the management of multiple threads.

As will be presented, it was able to reduce the time up to 90.02% when implementing the mutex instead of simple lock solution.

Source code can be found at: GitHub

PS.: Python version used: 3.10
PS.: Notebook configs used in these tests: Intel i7-1165G7 2.80 GHz | 16 Gb RAM


1. Study Case 📝

By exploring a race condition problem, we will develop a mutex to address the problem efficiently.

1.1. Race Condition Example

We have multiple objects to count, a function to increase the counter and multiple threads.

1.1.1. Counters

from dataclasses import dataclass


@dataclass
class Counter:
    value: int
    name: str


first_counter = Counter(value=0, name="first_counter")
second_counter = Counter(value=0, name="second_counter")
third_counter = Counter(value=0, name="third_counter")
fourth_counter = Counter(value=0, name="fourth_counter")
fifth_counter = Counter(value=0, name="fifth_counter")
sixth_counter = Counter(value=0, name="sixth_counter")
Enter fullscreen mode Exit fullscreen mode

1.1.2. Increase Counter Function

from time import sleep

def increase(by: int, counter: Counter):

    local_counter = counter.value
    local_counter += by

    sleep(0.1) # processing time simulation

    counter.value = local_counter
Enter fullscreen mode Exit fullscreen mode

1.1.3. Execution

import threading
from time import time

if __name__ == "__main__":

    threads = [
        threading.Thread(target=increase, args=(1,first_counter)),
        threading.Thread(target=increase, args=(1,fourth_counter)),
        threading.Thread(target=increase, args=(1,sixth_counter)),
        threading.Thread(target=increase, args=(1,third_counter)),
        threading.Thread(target=increase, args=(1,first_counter)),
        threading.Thread(target=increase, args=(1,fifth_counter)),
        threading.Thread(target=increase, args=(1,first_counter)),
        threading.Thread(target=increase, args=(1,second_counter)),
        threading.Thread(target=increase, args=(1,fourth_counter)),
        threading.Thread(target=increase, args=(1,first_counter)),
        threading.Thread(target=increase, args=(1,second_counter)),
        threading.Thread(target=increase, args=(1,first_counter)),
        threading.Thread(target=increase, args=(1,fourth_counter)),
        threading.Thread(target=increase, args=(1,first_counter)),
        threading.Thread(target=increase, args=(1,sixth_counter)),
        threading.Thread(target=increase, args=(1,third_counter)),
        threading.Thread(target=increase, args=(1,third_counter)),
        threading.Thread(target=increase, args=(1,fifth_counter)),
        threading.Thread(target=increase, args=(1,third_counter)),
        threading.Thread(target=increase, args=(1,sixth_counter)),
        threading.Thread(target=increase, args=(1,third_counter)),
    ]

    initial_time = time()

    for t in threads:
        t.start()

    for t in threads:
        t.join()

    final_time = time()

    print(f"\nFinal value {first_counter.name}:", first_counter.value)
    print(f"Final value {second_counter.name}:", second_counter.value)
    print(f"Final value {third_counter.name}:", third_counter.value)
    print(f"Final value {fourth_counter.name}:", fourth_counter.value)
    print(f"Final value {fifth_counter.name}:", fifth_counter.value)
    print(f"Final value {sixth_counter.name}:", sixth_counter.value)

    print(f"\nExecution time: {round(final_time - initial_time, 3)} seconds")
Enter fullscreen mode Exit fullscreen mode

1.1.4. Execution Results

Final value first_counter: 1
Final value second_counter: 1
Final value third_counter: 1
Final value fourth_counter: 1
Final value fifth_counter: 1
Final value sixth_counter: 1

Execution time: 0.102 seconds
Enter fullscreen mode Exit fullscreen mode

It wasn't what we expected. Although multiple threads were modifying the counters, they all have the same value: 1. This is an example of race conditions. But how can we solve this?

1.2. Solving the Problem

What if we could lock the counter so each thread increase the counter at a time? So let's insert a lock in this function so that any thread has to wait for its turn to increase the counter.

We can do that by implementing a Lock in the increase function:

import threading
from time import sleep

lock = threading.Lock()

def increase(by: int, counter: Counter):

    with lock:

        local_counter = counter.value
        local_counter += by

        sleep(0.1) # processing time simulation

        counter.value = local_counter
Enter fullscreen mode Exit fullscreen mode

Whenever a thread enters the context manager it locks for the rest. The lock is only released when thread exits the context manager.

1.2.1. Results After Simple Lock

Final value first_counter: 6
Final value second_counter: 2
Final value third_counter: 5
Final value fourth_counter: 3
Final value fifth_counter: 2
Final value sixth_counter: 3

Execution time: 2.108 seconds
Enter fullscreen mode Exit fullscreen mode

As we can see, it has taken a lot longer to finish execution after inserting the lock. How can we improve it? That is what our mutex is going to do.

2. Our Goal

The mutex has to check some requirements:

  1. Be efficient
  2. Be easy to maintain
  3. Be scalable - see Open–closed principle

For the sake of efficiency, our mutex will have multiple locks each associated with a specific key; Once the thread provides the key, the mutex will create a lock associated with the key or enqueues the predefined key if the lock already exists. Since our mutex has to be available to the entire system, the mutex needs to be a thread safe Singleton.


3. Mutex 🚦

Now let's implement what we have defined.

3.1. Thread Safe Singleton

import threading


class SingletonThreadSafeMeta(type):
    """Thread safe Singleton"""

    __instances = {}
    __lock = threading.Lock()

    def __call__(cls, *args, **kwargs):
        with cls.__lock:
            if cls not in cls.__instances:
                instance = super().__call__(*args, **kwargs)
                cls.__instances[cls] = instance
        return cls.__instances[cls]
Enter fullscreen mode Exit fullscreen mode

3.2. Key Based Mutex

import threading

from singleton_thread_safe import SingletonThreadSafeMeta


class CustomMutex(metaclass=SingletonThreadSafeMeta):

    def __init__(self):
        self.__lockers: dict[str, threading.Lock] = {}
        self.__lockers_counter: dict[str, int] = {}

    def __decrease_counter(self, key: str) -> None:
        self.__lockers_counter[key] -= 1

        if self.__lockers_counter == 0:
            del self.__lockers[key]
            del self.__lockers_counter[key]

    def __get_thread_locker(self, key: str) -> threading.Lock:
        if key in self.__lockers:
            self.__lockers_counter[key] += 1
            return self.__lockers[key]

        self.__lockers_counter[key] = 1
        thread_locker = threading.Lock()
        self.__lockers[key] = thread_locker

        return thread_locker

    def lock_with_key(self, key: str) -> None:
        """Locks the thread associated with the key"""

        thread_locker = self.__get_thread_locker(key)
        thread_locker.acquire()

    def unlock_with_key(self, key: str) -> None:
        """Unlocks the thread associated with the key"""

        if key in self.__lockers:

            thread_locker = self.__lockers[key]
            self.__decrease_counter(key)
            thread_locker.release()
Enter fullscreen mode Exit fullscreen mode

The Mutex class will be responsible for managing the relation between the locks and the keys; this relation is defined in the self.__lockers attribute.

One might be asking where is the queue so we can handle the threads waiting for the lock to be released. The queue is implemented under the hood. As a thread reaches thread_lock.acquire() the threading module will implement a FIFO queue. Therefore, there is no need for us to develop a queue.


4. Workers 🦺

Since we have the mutex it is important to create the workers so they can lock threads based on keys. Each thread will have an instance of a worker, every worker will report to the mutex in order to lock and unlock threads.

Mutex workers interaction

Each worker is responsible to manage timeouts in case a thread takes too long to unlock the queue in the mutex.

4.1. Abstract Worker

Since we can have multiple workers it is important to define an interface so they can all work in the same way.

from abc import ABC, abstractmethod

from mutex import CustomMutex


class MutexWorkerAbstract(ABC):
    @abstractmethod
    def __enter__(self) -> "MutexWorkerAbstract":
        """implement context manager"""

    @abstractmethod
    def __exit__(self, exc_type, exc_value, exc_tb) -> None:
        """implement context manager"""

    @property
    @abstractmethod
    def custom_mutex(self) -> CustomMutex:
        """Enforcing composition with CustomMutex"""
Enter fullscreen mode Exit fullscreen mode

As we dive into the worker's structure the context manager implementation will get clearer.

4.2. Worker Counter

import threading

from mutex import CustomMutex
from worker_abstract import MutexWorkerAbstract


class CounterMutexWorker(MutexWorkerAbstract):

    def __init__(self):
        self.__general_mutex = CustomMutex()
        self.__counter_key_lock: int | None = None
        self.__timeout = 5 # seconds
        self.__timeout_timer = threading.Timer(
            self.__timeout, 
            function=self.__timeout_callback
        )

    @property
    def custom_mutex(self) -> CustomMutex:
        return self.__general_mutex

    def __enter__(self) -> "CounterMutexWorker":
        return self

    def __exit__(self, exc_type, exc_value, exc_tb) -> None:
        self.unlock_by_counter()

    def __format_key_lock_thread(self, counter_name: int) -> str:
        key = f"COUNTER_WORKER_KEY_{counter_name}"
        return key

    def __timeout_callback(self) -> None:
        print(f"Lock timeout! worker=CounterMutexWorker | number={self.__counter_key_lock}")
        print(f"Releasing Lock... worker=CounterMutexWorker number={self.__counter_key_lock}")

        self.unlock_by_counter()

    def lock_by_counter(self, counter_name: str) -> None:
        """Locks thread by counter name"""

        key = self.__format_key_lock_thread(counter_name)
        self.custom_mutex.lock_with_key(key=key)
        self.__timeout_timer.start()
        self.__counter_key_lock = counter_name

    def unlock_by_counter(self) -> None:
        """Unlocks thread by counter name"""

        if self.__timeout_timer.is_alive():
            self.__timeout_timer.cancel()

        if self.__counter_key_lock:
            key = self.__format_key_lock_thread(self.__counter_key_lock)
            self.custom_mutex.unlock_with_key(key=key)
            self.__counter_key_lock = None
Enter fullscreen mode Exit fullscreen mode

PS.: Please note that two different workers cannot share the same key formatter

Now we can understand the importance of the context manager: if there is an exception before unlocking the thread, the queue is freed previous to the timeout expires.

Each worker has their own custom key. As soon as a thread locks the mutex, the worker initiates another thread to monitor the timeout, if the timeout exceeds the queue is released.


5. Testing

It's time to implement in the previous problem what we have built. Then let's check the results.

5.1. Implementing Worker

Applying the worker to the increase function.

from worker import CounterMutexWorker

def increase(by: int, counter: Counter):

    worker = CounterMutexWorker()

    with worker:
        worker.lock_by_counter(counter.name)

        local_counter = counter.value
        local_counter += by

        sleep(0.1) # simulating process time

        counter.value = local_counter

        worker.unlock_by_counter()
Enter fullscreen mode Exit fullscreen mode

5.1.1. Results After Mutex

Final value first_counter: 6
Final value second_counter: 2
Final value third_counter: 5
Final value fourth_counter: 3
Final value fifth_counter: 2
Final value sixth_counter: 3

Execution time: 0.608 seconds
Enter fullscreen mode Exit fullscreen mode

5.2. Testing timeout

As timeout was implemented, let's check how it performs. We can test it by decreasing self.__timeout from 5 to 0.02 seconds, since our sleep time is 0.1 seconds.

class CounterMutexWorker(MutexWorkerAbstract):

    def __init__(self):
        self.__general_mutex = CustomMutex()
        self.__counter_key_lock: int | None = None
        self.__timeout = 0.02 # seconds
        self.__timeout_timer = threading.Timer(
            self.__timeout, 
            function=self.__timeout_callback
        )
.
.
.
Enter fullscreen mode Exit fullscreen mode

5.2.1. Timeout Results

Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=fourth_counter
Releasing Lock... worker=CounterMutexWorker key=fourth_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=sixth_counter
Lock timeout! worker=CounterMutexWorker | key=fifth_counter
Lock timeout! worker=CounterMutexWorker | key=second_counter
Releasing Lock... worker=CounterMutexWorker key=sixth_counter
Releasing Lock... worker=CounterMutexWorker key=fifth_counter
Releasing Lock... worker=CounterMutexWorker key=second_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=fourth_counter
Releasing Lock... worker=CounterMutexWorker key=fourth_counter
Lock timeout! worker=CounterMutexWorker | key=sixth_counter
Releasing Lock... worker=CounterMutexWorker key=sixth_counter
Lock timeout! worker=CounterMutexWorker | key=fifth_counter
Releasing Lock... worker=CounterMutexWorker key=fifth_counter
Lock timeout! worker=CounterMutexWorker | key=second_counter
Releasing Lock... worker=CounterMutexWorker key=second_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=fourth_counter
Releasing Lock... worker=CounterMutexWorker key=fourth_counter
Lock timeout! worker=CounterMutexWorker | key=sixth_counter
Releasing Lock... worker=CounterMutexWorker key=sixth_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter

Final value first_counter: 2
Final value second_counter: 1
Final value third_counter: 1
Final value fourth_counter: 1
Final value fifth_counter: 1
Final value sixth_counter: 1

Execution time: 0.203 seconds
Enter fullscreen mode Exit fullscreen mode

It took only 0.2 seconds because as timeouts were exceeding, the locker's queue was being released.

5.3. Load Test

What if we increase the number of threads and counters? For the purpose of testing the performance of the mutex, let's set 1000 threads and 12 counters.

5.3.1. Execution

import threading
from random import randint
from time import time

if __name__ == "__main__":
    counters = [
        Counter(value=0, name="first_counter"),
        Counter(value=0, name="second_counter"),
        Counter(value=0, name="third_counter"),
        Counter(value=0, name="fourth_counter"),
        Counter(value=0, name="fifth_counter"),
        Counter(value=0, name="sixth_counter"),
        Counter(value=0, name="seventh_counter"),
        Counter(value=0, name="eighth_counter"),
        Counter(value=0, name="ninth_counter"),
        Counter(value=0, name="tenth_counter"),
        Counter(value=0, name="eleventh_counter"),
        Counter(value=0, name="twelfth_counter"),
    ]

    amount_of_threads = 1000

    threads = [
        threading.Thread(
            target=increase, 
            args=(
                1, 
                counters[randint(0, len(counters) -1)]
            )
        )
        for _ in range(amount_of_threads)
    ]

    initial_time = time()

    for t in threads:
        t.start()

    for t in threads:
        t.join()

    final_time = time()

    print("\n=======================================\n")

    for c in counters:
        print(f"Final value {c.name}:", c.value)    

    print("\n=======================================\n")

    print(f"Execution time: {round(final_time - initial_time, 3)} seconds")
Enter fullscreen mode Exit fullscreen mode

5.3.2. Results Using Simple Lock

=======================================

Final value first_counter: 74
Final value second_counter: 85
Final value third_counter: 85
Final value fourth_counter: 90
Final value fifth_counter: 92
Final value sixth_counter: 87
Final value seventh_counter: 85
Final value eighth_counter: 78
Final value ninth_counter: 85
Final value tenth_counter: 85
Final value eleventh_counter: 82
Final value twelfth_counter: 72

=======================================

Execution time: 100.403 seconds
Enter fullscreen mode Exit fullscreen mode

5.3.3. Results Using Mutex

=======================================

Final value first_counter: 74
Final value second_counter: 85
Final value third_counter: 85
Final value fourth_counter: 90
Final value fifth_counter: 92
Final value sixth_counter: 87
Final value seventh_counter: 85
Final value eighth_counter: 78
Final value ninth_counter: 85
Final value tenth_counter: 85
Final value eleventh_counter: 82
Final value twelfth_counter: 72

=======================================

Execution time: 9.815 seconds
Enter fullscreen mode Exit fullscreen mode

6. Conclusion

After implementing the mutex we managed to reduced the time considerably.

  1. 21 threads and 6 counters -> from 2.108 seconds to 0.608 seconds. Time reduction of 71.1%.

  2. 1000 threads and 12 counters -> from 100.403 seconds to 9.815 seconds. Time reduction of 90.02%.

As more threads and more counters were introduced these values got even better. So we can state that this is an excellent implementation for large systems.

Furthermore, the combination mutex + worker is scalable: If there is a demand for another lock in the same fashion, simply implement a new worker conforming to the class MutexWorkerAbstract.

Top comments (0)