DEV Community

loading...

Run blocking processes in async IO server

kuimono profile image Kuimono ・2 min read

In Python, although we can run some CPU-blocking tasks in a separate process, there could be question on how to integrate with other async IO tasks.

Problem

As I am working with my company about AI products, we usually wrap our microservices using async IO web framework (e.g. Sanic), to control the execution of AI model training / inferencing.

  • The IO-related tasks (e.g. database writes) are already implemented as async functions.
  • AI model training / inferencing tasks are usually thread blocking. Calling them as normal function and execute them in event loop could be disastrous: other tasks in event loops (i.e. other incoming requests) would be blocked.
  • Putting these AI related tasks to run in separate processes is a way out, but these tasks still need some kind of interaction with the async IO server, e.g. db updates about current task's progress.

Solution

  • While we execute a thread-blocking task, we run it in a separate process (using multiprocessing module), and use a pair of request/response queues (multiprocessing.Queue) for communications.
  • We want the task to be "wrapped" within an async function, so that the user of the function can just call it, and expect it to be non-blocking.

Solution

  • The async function wrapper can invoke asyncio.sleep() when waiting for message from response queue, to allow the event loop to take care of other async IO tasks (i.e. non-blocking).
  • When the process task needs to invoke some asyncio function, it can send the message through the queue for that.

Coding Example

(In the coding example, the names of the queues become send_queue and receive_queue, relative to the usage of the each function itself.)

Coding Example

Discussion (0)

pic
Editor guide