loading...

Python: how to use multiprocessing to finish work faster

wrighter profile image wrighter Originally published at wrighters.io on ・5 min read

It’s very common in data science work to have some function or a set of functions that you run in a loop to process or analyze data. When you see a loop that’s performing an expensive operation, you should immediately think of (at least) two ways to speed things up. The first is vectorization, which I won’t cover here, and the second is multiple threads (or processes) to allow the work to be done concurrently and fully utilize your hardware.

Let’s look at a simple example of how the multiprocessing module in Python can be used to solve this problem. In multiprocessing, multiple Python processes are created and used to execute a function instead of multiple threads, bypassing the Global Interpreter Lock (GIL) that can significantly slow down threaded Python programs. The goal is to take pieces of work that can be subdivided, perform that work in different processes using the full resources of the computer, then returning the results of those calculations to the main process for more with the combined data.

For this example, I’m going to create some dummy time series data. This could be any sort of time series, such as daily attendance, temperature, stock prices, or store sales. I’m using the itertools module to generate some sequential strings for labels. I’m also using pandas to make a business date range going back 10+ years. The idea here is to generate enough data of enough size that processing them will take some time that we can measure.

Note that all the examples below were written and executed in Python 3.6.10 using an ipython shell.

import os
import multiprocessing
import functools 
import itertools 
import string 
import pandas as pd 
import numpy as np 

dates = pd.bdate_range("20100101", pd.Timestamp.today()) 
labels = ["".join(l) for l in itertools.combinations(string.ascii_letters, 2)] 
os.makedirs("data", exist_ok=True)
for label in labels: 
    df = pd.Series(np.random.random(len(dates)), index=dates).to_frame("data") 
    df.to_csv(os.path.join("data", f"{label}.csv"))

OK, now we’ve got a chunk of data files. Let’s just write a function that will read in one of those files, perform some simple calculations with the data, then return the result. (Don’t forget to delete these files when you’re done if you’re following along at home).

def process_file(label): 
    path = os.path.join("data", f"{label}.csv") 
    df = pd.read_csv(path) 
    return df.describe()

We’ll use the %timeit magic in our jupyter notebook or ipython session to see what the cost is to process a single file. The -o option will return a result that we can use. Now we know that processing these files in a single loop will be linear, so getting our expected run time (in seconds) is pretty straightforward.

In [6]: r = %timeit -o process_file('az')
5.85 ms ± 255 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 
In [7]: r.average \* len(labels) 
Out[7]: 7.755164415974036

And since computers are fast, we’ll go ahead and actually run this to see what the actual time is. Note that %timeit runs the code multiple times, so this can still take a while. You should maybe run this yourself just so you can experience the frustration of waiting for the work to complete.

In [8]: %timeit for l in labels: process_file(l)
8.27 s ± 354 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

This is not bad, on my computer I can process more than 1000 data files in only about 8 seconds. But if I have bigger files and need to do more complex calculations, I’ll want to use more of my processing power. In my case, I have a quad core Intel i7 with 8 threads, your machine may be different.

Before solving this problem using the multiprocessing module, let’s look at a trivial example to see a few basic guidelines. First, all programs running multiprocessing need a guard to check if the process is the main process or a child process. This guard ensures that all the subprocesses can import the main code without side effects, such as trying to launch more processes in an endless loop. A second point is that you should avoid shared state between the processes and try to isolate the work in the function that you are executing. Finally, the arguments to the methods need to be pickleable, since that’s how the module moves data between processes. Since we are loading a file from disk in the method and returning a small amount of data, this problem is a good candidate.

In [9]: # our function that we will execute in another process ...: def say_hi(): 
...:     print("Child process:", multiprocessing.current_process()) 
...:     print('Hi') 
...: 
...: 
...: if __name__ == '__main__': 
...:     p = multiprocessing.Process(target=say_hi) 
...:     print("Main:", multiprocessing.current_process()) 
...:     p.start() 
Main: <_MainProcess(MainProcess, started)> 
Child process: <Process(Process-1, started)>
Hi

But we don’t want to just run one child process, but rather as many as we can effectively use on our computer. One good way to do this is to use a Pool. A Pool has multiple methods that can be used to execute functions. A simple and common method is to use map which is just a parallel version of the built-in method. It invokes the first argument method with each item in the iterable second argument. You can tell the Pool how many processors to use, or let it use all your processors by default.

In [10]: %%timeit 
...: if __name__ == '__main__': 
...: with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2) as pool: 
...: results = pool.map(process_file, labels) 
...: 2.28 s ± 131 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

On my machine, the speed improvement goes from about 8 seconds to a little over 2, about a 3.5x improvement. Obviously, as execution times get longer and less time is spent passing data back and forth between processes, this improvement will get closer to the number of processes available.

One last thing that is worth looking at is a few examples of more complicated method invocations. What if instead of a simple list of single arguments, there were multiple arguments to the function? If the extra arguments are common, using functools.partial is a good solution. Just give the partial the extra arguments.

def process_file2(label, threshold): 
    path = os.path.join("data", f"{label}.csv") 
    df = pd.read_csv(path) 
    return df['data'].mean() > threshold 
if __name__ == '__main__':
    with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2) as pool: 
        results = pool.map(functools.partial(process_file2, threshold=.2), labels)

You also may have cases where the arguments are little more complicated and not fixed for all invocations. In this case, you can use starmap with a list of dict arguments.

# here's just a simple example of data with different arguments for some of the labels 
def make_thresh(label): 
    if 'a' in label or 'z' in label: 
        return .3 
    else: 
        return .4 

args = [(l, make_thresh(l)) for l in labels] 

if __name__ == '__main__': 
    with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2) as pool: 
        results = pool.starmap(process_file2, args)

I hope this brief intro to the multiprocessing module has shown you some easy ways to speed up your Python code and make full use of your environment to finish work more quickly.

Discussion

pic
Editor guide