## DEV Community

Lucas Leal da Costa

Posted on • Updated on

# Solving Race Conditions in Python: A Mutex Approach for Efficiency

## Introduction

When working with concurrent programs, it is crucial to prevent race conditions. In this article, we will demonstrate the implementation of a scalable and efficient mutex to facilitate the management of multiple threads.

As will be presented, it was able to reduce the time up to `90.02%` when implementing the mutex instead of simple lock solution.

Source code can be found at: GitHub

PS.: Python version used: 3.10
PS.: Notebook configs used in these tests: Intel i7-1165G7 2.80 GHz | 16 Gb RAM

## 1. Study Case đź“ť

By exploring a race condition problem, we will develop a mutex to address the problem efficiently.

### 1.1. Race Condition Example

We have multiple objects to count, a function to increase the counter and multiple threads.

#### 1.1.1. Counters

``````from dataclasses import dataclass

@dataclass
class Counter:
value: int
name: str

first_counter = Counter(value=0, name="first_counter")
second_counter = Counter(value=0, name="second_counter")
third_counter = Counter(value=0, name="third_counter")
fourth_counter = Counter(value=0, name="fourth_counter")
fifth_counter = Counter(value=0, name="fifth_counter")
sixth_counter = Counter(value=0, name="sixth_counter")
``````

#### 1.1.2. Increase Counter Function

``````from time import sleep

def increase(by: int, counter: Counter):

local_counter = counter.value
local_counter += by

sleep(0.1) # processing time simulation

counter.value = local_counter
``````

#### 1.1.3. Execution

``````import threading
from time import time

if __name__ == "__main__":

]

initial_time = time()

for t in threads:
t.start()

for t in threads:
t.join()

final_time = time()

print(f"\nFinal value {first_counter.name}:", first_counter.value)
print(f"Final value {second_counter.name}:", second_counter.value)
print(f"Final value {third_counter.name}:", third_counter.value)
print(f"Final value {fourth_counter.name}:", fourth_counter.value)
print(f"Final value {fifth_counter.name}:", fifth_counter.value)
print(f"Final value {sixth_counter.name}:", sixth_counter.value)

print(f"\nExecution time: {round(final_time - initial_time, 3)} seconds")
``````

#### 1.1.4. Execution Results

``````Final value first_counter: 1
Final value second_counter: 1
Final value third_counter: 1
Final value fourth_counter: 1
Final value fifth_counter: 1
Final value sixth_counter: 1

Execution time: 0.102 seconds
``````

It wasn't what we expected. Although multiple threads were modifying the counters, they all have the same value: 1. This is an example of race conditions. But how can we solve this?

### 1.2. Solving the Problem

What if we could lock the counter so each thread increase the counter at a time? So let's insert a lock in this function so that any thread has to wait for its turn to increase the counter.

We can do that by implementing a Lock in the `increase` function:

``````import threading
from time import sleep

def increase(by: int, counter: Counter):

with lock:

local_counter = counter.value
local_counter += by

sleep(0.1) # processing time simulation

counter.value = local_counter
``````

Whenever a thread enters the context manager it locks for the rest. The lock is only released when thread exits the context manager.

#### 1.2.1. Results After Simple Lock

``````Final value first_counter: 6
Final value second_counter: 2
Final value third_counter: 5
Final value fourth_counter: 3
Final value fifth_counter: 2
Final value sixth_counter: 3

Execution time: 2.108 seconds
``````

As we can see, it has taken a lot longer to finish execution after inserting the lock. How can we improve it? That is what our mutex is going to do.

## 2. Our Goal

The mutex has to check some requirements:

1. Be efficient
2. Be easy to maintain
3. Be scalable - see Openâ€“closed principle

For the sake of efficiency, our mutex will have multiple locks each associated with a specific key; Once the thread provides the key, the mutex will create a lock associated with the key or enqueues the predefined key if the lock already exists. Since our mutex has to be available to the entire system, the mutex needs to be a thread safe Singleton.

## 3. Mutex đźš¦

Now let's implement what we have defined.

### 3.1. Thread Safe Singleton

``````import threading

__instances = {}

def __call__(cls, *args, **kwargs):
with cls.__lock:
if cls not in cls.__instances:
instance = super().__call__(*args, **kwargs)
cls.__instances[cls] = instance
return cls.__instances[cls]
``````

### 3.2. Key Based Mutex

``````import threading

def __init__(self):
self.__lockers: dict[str, threading.Lock] = {}
self.__lockers_counter: dict[str, int] = {}

def __decrease_counter(self, key: str) -> None:
self.__lockers_counter[key] -= 1

if self.__lockers_counter == 0:
del self.__lockers[key]
del self.__lockers_counter[key]

if key in self.__lockers:
self.__lockers_counter[key] += 1
return self.__lockers[key]

self.__lockers_counter[key] = 1

def lock_with_key(self, key: str) -> None:
"""Locks the thread associated with the key"""

def unlock_with_key(self, key: str) -> None:
"""Unlocks the thread associated with the key"""

if key in self.__lockers:

self.__decrease_counter(key)
``````

The `Mutex` class will be responsible for managing the relation between the locks and the keys; this relation is defined in the `self.__lockers` attribute.

One might be asking where is the queue so we can handle the threads waiting for the lock to be released. The queue is implemented under the hood. As a thread reaches `thread_lock.acquire()` the `threading` module will implement a FIFO queue. Therefore, there is no need for us to develop a queue.

## 4. Workers đź¦ş

Since we have the mutex it is important to create the workers so they can lock threads based on keys. Each thread will have an instance of a worker, every worker will report to the mutex in order to lock and unlock threads.

Each worker is responsible to manage timeouts in case a thread takes too long to unlock the queue in the mutex.

### 4.1. Abstract Worker

Since we can have multiple workers it is important to define an interface so they can all work in the same way.

``````from abc import ABC, abstractmethod

from mutex import CustomMutex

class MutexWorkerAbstract(ABC):
@abstractmethod
def __enter__(self) -> "MutexWorkerAbstract":
"""implement context manager"""

@abstractmethod
def __exit__(self, exc_type, exc_value, exc_tb) -> None:
"""implement context manager"""

@property
@abstractmethod
def custom_mutex(self) -> CustomMutex:
"""Enforcing composition with CustomMutex"""
``````

As we dive into the worker's structure the context manager implementation will get clearer.

### 4.2. Worker Counter

``````import threading

from mutex import CustomMutex
from worker_abstract import MutexWorkerAbstract

class CounterMutexWorker(MutexWorkerAbstract):

def __init__(self):
self.__general_mutex = CustomMutex()
self.__counter_key_lock: int | None = None
self.__timeout = 5 # seconds
self.__timeout,
function=self.__timeout_callback
)

@property
def custom_mutex(self) -> CustomMutex:
return self.__general_mutex

def __enter__(self) -> "CounterMutexWorker":
return self

def __exit__(self, exc_type, exc_value, exc_tb) -> None:
self.unlock_by_counter()

def __format_key_lock_thread(self, counter_name: int) -> str:
key = f"COUNTER_WORKER_KEY_{counter_name}"
return key

def __timeout_callback(self) -> None:
print(f"Lock timeout! worker=CounterMutexWorker | number={self.__counter_key_lock}")
print(f"Releasing Lock... worker=CounterMutexWorker number={self.__counter_key_lock}")

self.unlock_by_counter()

def lock_by_counter(self, counter_name: str) -> None:
"""Locks thread by counter name"""

self.custom_mutex.lock_with_key(key=key)
self.__timeout_timer.start()
self.__counter_key_lock = counter_name

def unlock_by_counter(self) -> None:
"""Unlocks thread by counter name"""

if self.__timeout_timer.is_alive():
self.__timeout_timer.cancel()

if self.__counter_key_lock:
self.custom_mutex.unlock_with_key(key=key)
self.__counter_key_lock = None
``````

PS.: Please note that two different workers cannot share the same key formatter

Now we can understand the importance of the context manager: if there is an exception before unlocking the thread, the queue is freed previous to the timeout expires.

Each worker has their own custom key. As soon as a thread locks the mutex, the worker initiates another thread to monitor the timeout, if the timeout exceeds the queue is released.

## 5. Testing

It's time to implement in the previous problem what we have built. Then let's check the results.

### 5.1. Implementing Worker

Applying the worker to the `increase` function.

``````from worker import CounterMutexWorker

def increase(by: int, counter: Counter):

worker = CounterMutexWorker()

with worker:
worker.lock_by_counter(counter.name)

local_counter = counter.value
local_counter += by

sleep(0.1) # simulating process time

counter.value = local_counter

worker.unlock_by_counter()
``````

#### 5.1.1. Results After Mutex

``````Final value first_counter: 6
Final value second_counter: 2
Final value third_counter: 5
Final value fourth_counter: 3
Final value fifth_counter: 2
Final value sixth_counter: 3

Execution time: 0.608 seconds
``````

### 5.2. Testing timeout

As timeout was implemented, let's check how it performs. We can test it by decreasing `self.__timeout` from 5 to 0.02 seconds, since our `sleep` time is 0.1 seconds.

``````class CounterMutexWorker(MutexWorkerAbstract):

def __init__(self):
self.__general_mutex = CustomMutex()
self.__counter_key_lock: int | None = None
self.__timeout = 0.02 # seconds
self.__timeout,
function=self.__timeout_callback
)
.
.
.
``````

#### 5.2.1. Timeout Results

``````Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=fourth_counter
Releasing Lock... worker=CounterMutexWorker key=fourth_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=sixth_counter
Lock timeout! worker=CounterMutexWorker | key=fifth_counter
Lock timeout! worker=CounterMutexWorker | key=second_counter
Releasing Lock... worker=CounterMutexWorker key=sixth_counter
Releasing Lock... worker=CounterMutexWorker key=fifth_counter
Releasing Lock... worker=CounterMutexWorker key=second_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=fourth_counter
Releasing Lock... worker=CounterMutexWorker key=fourth_counter
Lock timeout! worker=CounterMutexWorker | key=sixth_counter
Releasing Lock... worker=CounterMutexWorker key=sixth_counter
Lock timeout! worker=CounterMutexWorker | key=fifth_counter
Releasing Lock... worker=CounterMutexWorker key=fifth_counter
Lock timeout! worker=CounterMutexWorker | key=second_counter
Releasing Lock... worker=CounterMutexWorker key=second_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=fourth_counter
Releasing Lock... worker=CounterMutexWorker key=fourth_counter
Lock timeout! worker=CounterMutexWorker | key=sixth_counter
Releasing Lock... worker=CounterMutexWorker key=sixth_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter

Final value first_counter: 2
Final value second_counter: 1
Final value third_counter: 1
Final value fourth_counter: 1
Final value fifth_counter: 1
Final value sixth_counter: 1

Execution time: 0.203 seconds
``````

It took only 0.2 seconds because as timeouts were exceeding, the locker's queue was being released.

### 5.3. Load Test

What if we increase the number of threads and counters? For the purpose of testing the performance of the mutex, let's set `1000 threads` and `12 counters`.

#### 5.3.1. Execution

``````import threading
from random import randint
from time import time

if __name__ == "__main__":
counters = [
Counter(value=0, name="first_counter"),
Counter(value=0, name="second_counter"),
Counter(value=0, name="third_counter"),
Counter(value=0, name="fourth_counter"),
Counter(value=0, name="fifth_counter"),
Counter(value=0, name="sixth_counter"),
Counter(value=0, name="seventh_counter"),
Counter(value=0, name="eighth_counter"),
Counter(value=0, name="ninth_counter"),
Counter(value=0, name="tenth_counter"),
Counter(value=0, name="eleventh_counter"),
Counter(value=0, name="twelfth_counter"),
]

target=increase,
args=(
1,
counters[randint(0, len(counters) -1)]
)
)
for _ in range(amount_of_threads)
]

initial_time = time()

for t in threads:
t.start()

for t in threads:
t.join()

final_time = time()

print("\n=======================================\n")

for c in counters:
print(f"Final value {c.name}:", c.value)

print("\n=======================================\n")

print(f"Execution time: {round(final_time - initial_time, 3)} seconds")
``````

#### 5.3.2. Results Using Simple Lock

``````=======================================

Final value first_counter: 74
Final value second_counter: 85
Final value third_counter: 85
Final value fourth_counter: 90
Final value fifth_counter: 92
Final value sixth_counter: 87
Final value seventh_counter: 85
Final value eighth_counter: 78
Final value ninth_counter: 85
Final value tenth_counter: 85
Final value eleventh_counter: 82
Final value twelfth_counter: 72

=======================================

Execution time: 100.403 seconds
``````

#### 5.3.3. Results Using Mutex

``````=======================================

Final value first_counter: 74
Final value second_counter: 85
Final value third_counter: 85
Final value fourth_counter: 90
Final value fifth_counter: 92
Final value sixth_counter: 87
Final value seventh_counter: 85
Final value eighth_counter: 78
Final value ninth_counter: 85
Final value tenth_counter: 85
Final value eleventh_counter: 82
Final value twelfth_counter: 72

=======================================

Execution time: 9.815 seconds
``````

## 6. Conclusion

After implementing the mutex we managed to reduced the time considerably.

1. `21 threads` and `6 counters` -> from `2.108 seconds` to `0.608 seconds`. Time reduction of `71.1%`.

2. `1000 threads` and `12 counters` -> from `100.403 seconds` to `9.815 seconds`. Time reduction of `90.02%`.

As more threads and more counters were introduced these values got even better. So we can state that this is an excellent implementation for large systems.

Furthermore, the combination mutex + worker is scalable: If there is a demand for another lock in the same fashion, simply implement a new worker conforming to the class `MutexWorkerAbstract`.