DEV Community

Ali Sherief
Ali Sherief

Posted on

Python Multiprocessing: Learning Pools, Managers and Challenges at Lightspeed

Welcome to my attempt at learning Python library functions at lightspeed! I already know the Python language itself, and I write in it a lot, but there are still many unknown modules out there. I have discovered that reading the test cases for the languages themselves, as well as sharing them with you all here to retain my memory.

Today I'm going to take a jab at the multiprocessing Python module. Specifically I'm going cover:

  1. manager
  2. pool
  3. connection and the authentication challenges

Now lets begin.

Manager

In a nutshell, manager holds inter-process synchronization primitives. From the Python documentation:

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

Manager objects give us an arsenal of synchronization primitives (which I will hereby abbreviate as 'syncprims') to play with. We get the usual locks, semaphores and barriers, plus, we get thread-safe Queue, Value and Array data structures too. These work like the normal non-threaded types you're used to (and Value is equivalent to a single variable). Manager objects can even be accessed by Python processes running across networks.

Here is an example of Queue usage taken directly from the multiprocessing test suite:

@classmethod
def _test_queue(cls, obj):
    assert obj.qsize() == 2
    assert obj.full()
    assert not obj.empty()
    assert obj.get() == 5
    assert not obj.empty()
    assert obj.get() == 6
    assert obj.empty()

def test_queue(self, qname="Queue"):
    o = getattr(self.manager, qname)(2)
    o.put(5)
    o.put(6)
    self.run_worker(self._test_queue, o)
    assert o.empty()
    assert not o.full()
Enter fullscreen mode Exit fullscreen mode

Key takeaways from this code snipplet:

  • manager has a method called Queue() which is an address (proxy) pointing to shared queues (yes, queues) managed by the manager object, although the presence of Queue() is less obvious because the function is returned by gettattr which is called with an argument qname (which itself is "queue"). It is possible to call Queue() multiple times to create multiple shared queues.
  • the manager.Queue() takes an argument that is the maximum size of the queue, hence the extra (2). (Remember this snipplet is located in the multiprocessing module so all names here would be qualified by multiprocessing.)
  • the o.get() and o.put() calls put values on the queue. The values can have different types. I could have as well wrote:
o.put("foo")
o.put(6)
Enter fullscreen mode Exit fullscreen mode

Pool

Pools are used for offloading tasks to worker processes. Here is a slightly edited rudimentary example of Pool usage:

def sqr_wait(x, wait=0.0):
    time.sleep(wait)
    return x*x

def mul(x, y):
    return x*y
# ...

@classmethod
def setUpClass(cls):
    super().setUpClass()
    cls.pool = cls.Pool(4)

@classmethod
def tearDownClass(cls):
    cls.pool.terminate()
    cls.pool.join()
    cls.pool = None
    super().tearDownClass()

def test_apply(self):
    papply = self.pool.apply
    self.assertEqual(papply(sqr_wait, (5,)), sqr_wait(5))
    self.assertEqual(papply(sqr_wait, (), {'x':3}), sqr_wait(x=3))

def test_map(self):
    pmap = self.pool.map
    self.assertEqual(pmap(sqr_wait, list(range(10))), list(map(sqr_wait, list(range(10)))))
    self.assertEqual(pmap(sqr_wait, list(range(100)), chunksize=20),
                     list(map(sqr_wait, list(range(100)))))

def test_starmap(self):
    psmap = self.pool.starmap
    tuples = list(zip(range(10), range(9,-1, -1)))
    self.assertEqual(psmap(mul, tuples),
                     list(itertools.starmap(mul, tuples)))
    tuples = list(zip(range(100), range(99,-1, -1)))
    self.assertEqual(psmap(mul, tuples, chunksize=20),
                     list(itertools.starmap(mul, tuples)))

Enter fullscreen mode Exit fullscreen mode

Notice the setUp() and tearDown() methods above. This is also an excerpt from a test case. Also to make it clear, the tuples variable in the starmap test are lists which have values [[0, 9], [1, 8], ..., [9, 0]] and [[0, 99], [1, 98], ..., [99, 0]] recpectively. The zip() call is for putting the lists into this format.

The main points here are:

  • Pool has three different types of apply methods, apply, map, and starmap. All three functions also have async (non-blocking) variants. The ones I just displayed are blocking variants.

(There is also imap which is a more efficient version of map for long lists, and imap_unordered which returns the results in arbitrary order).

  • apply runs the function on only one worker. Second parameter is a tuple of normal arguments, third parameter is a dictionary of keyword arguments

  • map splits up the list, prepares function calls for each value of the list and distributes the function calls across all workers. The called function must support only one argument.

  • starmap is like map except the list has tuples, each tuple contains arguments that will be passed to the function.

You'll probably get away with passing a list of different-sized tuples to starmap as long as the funciton being called has default values for the arguments not specified.

Pool could prove useful if you have a numpy/scipy computation that needs to run in parallel across multiple threads.

Connection

Last, we take a look at the networking multiprocessing can do for us.

The Python docs have this summary for Connection:

Connection objects allow the sending and receiving of picklable objects or strings. They can be thought of as message oriented connected sockets.

So basically, consider this as a way to cheaply send objects across Python processes.

Normally, Pipe() is used to create a pair of Connections. However, here we will look at the more powerful connection submodule which also supports sockets, Windows named pipes and of course across the internet.

You make listeners by calling connection.Listener(), and to connect some other Python process to a listener, call connection.Client().

For security, most likely if your object has sensitive data which you will send across the internet, you can set up the listener to accept a challenge, which you can imagine is similar to a private key, from the client, or the other way around (doesn't matter who sends first).

This is done by calling connection.deliver_challenge(connection, authkey). The challenge is answered by connection.answer_challenge(connection, authkey). The authkey is supposed to be a byte string as stated on the python docs, but I'm not sure how to use it, so if you do, please answer the Stack Overflow question I just made here.

Surprisingly, there are no test cases for deliver_challenge or answer_challenge. TODO: remind me to write some for them. This is the best I could find:

@classmethod
def _listener(cls, conn, families):
    for fam in families:
        l = cls.connection.Listener(family=fam)
        conn.send(l.address)
        new_conn = l.accept()
        conn.send(new_conn)
        new_conn.close()
        l.close()


@classmethod
def _remote(cls, conn):
    for (address, msg) in iter(conn.recv, None):
        client = cls.connection.Client(address)
        client.send(msg.upper())
        client.close()

Enter fullscreen mode Exit fullscreen mode

And we're done

I hope you enjoyed this 5-minute lightspeed of Python functions, and learned a lot more about it! I didn't test any of the code I wrote here (I learn passively not by typing things in a REPL console), so if I made a mistake here let me know in the comments.

Top comments (0)