DEV Community

Andrea Tedeschi
Andrea Tedeschi

Posted on

Reinvent the wheel to understand Python asyncio.

During the last years coroutine based concurrency and the async/await syntax has exploded over a lot of languages, and so it has in Python too.
I found that people start using it recently (who haven't been there while it was implemented) are experiencing different kinds of problems while coding or debugging.

In this article we're gonna write our own multitasking library without using neither asyncio, nor the async and the await keywords, after exploring the basics of concurrency.

This post assumes you're already familiar with python, python iterators and generators and with socket programming.

What is concurrency anyway?

According to wikipedia

Concurrent computing is a form of computing in which several computations are executed concurrently—during overlapping time periods

In practice it means that while our function a() is executing, other functions may execute too and they run to completion in an interleaved manner.
However our program is still executing one thing at a time (as our cpu core is capable).

Parallelism (executing more things at once) is a special form of concurrency, but we're not talking about it today.

How do we reach concurrency in our programs

The easiest way to write a concurrent program is by using threads: you spawn a function in a thread, it starts running and any time the opportunity arises our cpu will switch between threads.

However, there are well known problems with threading programming like synchronization, memory usage, not having the control over context switches, etc, which all combined lead to some scalability limitation (you can find useful resources and articles on those problems searching online). Developers looked for something more lightweight and scalable that can be combined with multi-threading if needed and they came out with.. iterators.

Concurrency with iterators

How do we achieve concurrency with iterators? There are two core concepts to keep into account:

  • Interleaving.
  • Execution during overlapping time periods.

If you think about how we can interleave execution of different units of code without spawning threads, you'll probably find out that you need a way to pause/resume that unit of code.
Look at the very basic implementation of an iterator:

class ConcurrentUnit:
    def __init__(self, to: int):
        self._to = to
        self._i = -1

    def __iter__(self):
        return self

    def __next__(self):
        self._i += 1
        if self._i >= self._to:
            raise StopIteration
        return self._i
Enter fullscreen mode Exit fullscreen mode

As you already know a for loop just keeps calling .__next__ until StopIteration raises. Let's abuse this to execute code concurrently.

from typing import TypeVar

T = TypeVar('T')


class ConcurrentUnit:
    def __init__(self, to: int, return_value: T):
        self._to = to
        self._i = -1
        self._return_value = return_value

    def __iter__(self):
        return self

    def __next__(self):
        self._i += 1
        if self._i >= self._to:
            raise StopIteration(self._return_value)
        return self._i


if __name__ == '__main__':
    cu1 = ConcurrentUnit(5, 'cu1')
    cu2 = ConcurrentUnit(3, 'cu2')

    tasks = [cu1, cu2]

    while tasks:
        t = tasks.pop(0)
        try:
            step = next(t)
            print(step)
        except StopIteration as e:
            print(e.value)
        else:
            tasks.append(t)
Enter fullscreen mode Exit fullscreen mode

If you run that code, the output will be:

0
0
1
1
2
2
3
cu2
4
cu1
Enter fullscreen mode Exit fullscreen mode

You can see that our units have executed in an interleaved manner during overlapping periods of time and so yes, even without any benefit yet, we have written concurrent code. Let's look at it in detail.

The ConcurrentUnit class should be very easy to understand, from a behavioral point of view it's simulating the usage of range(x) (I've omitted start to keep it simple), but it also has a return_value parameter with a generic type annotation, that enable returning values from the execution. The return_value is bound to StopIteration when raised by __next__ and we need to manually handle it calling __next__ in a try/except block (we can't simply use a for loop which would handle the exception silently).

In our main block we create two concurrent units (they could've been more) and we store them in a list (like we would've done with a scheduler, more on this later) and we run our loop:

  • First we pop our first unit in the list.
  • We call next on our unit in a try/except block, and print the result out.
  • If it raises StopIteration we get the return_value and print it out. At this point we know that our unit is done.
  • Otherwise, we know that our unit is not done, so we append it to our list.

Python Generators

The code above is very odd, and wrapping the logic of functions in iterator classes will really soon lead to big spaghetti code.
Luckily for us python has generators, which will let us declare functions that behave like iterators, and in addition they bound the value of the return statement to the StopIteration instance.

We can convert our above code into:

def concurrent_unit(to: int) -> Generator[str]:
    for i in range(to):
        yield i
    return f"run for {to} times"


if __name__ == '__main__':
    cu1 = concurrent_unit(5)
    cu2 = concurrent_unit(3)

    tasks = [cu1, cu2]

    while tasks:
        t = tasks.pop(0)
        try:
            step = next(t)
            print(step)
        except StopIteration as e:
            print(e.value)
        else:
            tasks.append(t)
Enter fullscreen mode Exit fullscreen mode

and the code will behave the same.

There's an important concept you need to understand before we can move on and that's the difference between generator objects and generator functions.
A generator function is a function that just return a generator object: it does not execute any code other than creating the generator object. They can be recognized as the function body contains at least one yield.
The resulting generator object than implement the iteration protocol with __next__ executing code up to the next yield statement.
This concept applies to coroutines too: async def functions are coroutine functions that return coroutine objects when called.
From now on, when I'll say generator I could either refer to functions or objects, the context will make it clear.

Build our own concurrency library

In this section we're gonna use generators to develop the basic of our concurrency library.
First we'll define a Task object to wrap around generators and providing a layer of abstraction over our dude spaghetti. Then we'll write a scheduler to handle tasks execution. Let's dive into it.

from collections.abc import Generator
from typing import Any, TypeVar

T = TypeVar("T")


class Task:
    def __init__(
        self,
        generator: Generator[Any, Any, T],
        *,
        name: str | None = None,
        debug: bool = False,
    ) -> None:
        self._generator = generator
        self._name = name
        self._debug = debug
        self._result: T | None = None
        self._exception: Exception | None = None
        self._done = False

    def __repr__(self) -> str:
        return f"<Task: {self._name}, done: {self._done}>"

    def _step(self) -> None:
        if self._done:
            raise RuntimeError(f"{self}: Cannot step a done task")
        try:
            step = self._generator.send(None)
            if self._debug:
                print(f"{self}: {step}")
        except StopIteration as e:
            self._done = True
            self._result = e.value
        except Exception as e:
            self._done = True
            self._exception = e
            if self._debug:
                print(f"{self}: Exception: {e}")

    def result(self) -> T:
        return self._result

    def exception(self) -> Exception:
        return self._exception

    def done(self) -> bool:
        return self._done
Enter fullscreen mode Exit fullscreen mode

Our Task class stores a generator object and has 3 attributes that are worth looking at:

  • _done indicating whether the task can be considered completed or not.
  • _result indicating the generator return value, if any.
  • _exception any exception other than StopIteration that our generator may raises.

The _step method builds upon the execution logic used before with iterators: it represents a single "step" of our task. It calls next on self._generator (gen.send(None) is the same as next(gen)) and if we get either a result (wrapped in a StopIteration error) or an exception, stores it in the corresponding attribute.

You're may asking yourself "why is he just storing the exception instead of raising it?". In the next section I'm answering that question. By now, go on to build a scheduler for our tasks:

from collections.abc import Callable, Generator
from typing import Any, TypeVar

from .tasks import Task

T = TypeVar("T")

class EventLoop:
    def __init__(self, *, debug: bool = False) -> None:
        self._debug = debug
        self._tasks: list[Task] = []
        self._tasks_counter: int = 0

    def create_task(
        self, generator: Generator[Any, Any, T], *, name: str | None = None
    ) -> Task:
        task = Task(
            generator,
            name=name or f"Task-{self._tasks_counter}",
            debug=self._debug,
        )
        self._tasks.append(task)
        self._tasks_counter += 1
        return task

    def run_until_complete(
        self,
        generator: Generator[Any, Any, T],
        *,
        task_name: str | None = None,
    ) -> T:
        main_task = self.create_task(generator, name=task_name)
        while not main_task._done:
            for task in self._tasks:
                task._step()
                if task._done:
                    self._tasks.remove(task)
        if main_task._exception:
            raise main_task._exception
        return main_task._result
Enter fullscreen mode Exit fullscreen mode

Looks familiar? Our event loop has a method to create new task objects and one for running them.
run_until_complete takes a generator, creates a task from it (main_task) and then runs all the scheduled tasks until main_task completes. The execution logic is not different from our first poc with iterators: we iterate through self._tasks, run "one step" of each item and any time a task has done we remove it from the list.

def concurrent_unit(to: int) -> Generator[str]:
    for i in range(to):
        yield i
    return f"run for {to} times"


if __name__ == '__main__':
    loop = EventLoop(debug=True)

    t1 = loop.create_task(concurrent_unit(2))
    t2 = loop.create_task(concurrent_unit(3))

    loop.run_until_complete(concurrent_unit(5))

# output
# <Task: Task-0, done: False>: 0
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: Task-0, done: False>: 1
# <Task: Task-1, done: False>: 1
# <Task: Task-2, done: False>: 1
# <Task: Task-2, done: False>: 2
# <Task: Task-1, done: False>: 2
# <Task: Task-2, done: False>: 3
# <Task: Task-2, done: False>: 4
Enter fullscreen mode Exit fullscreen mode

Cool, but what about await?

As I stated before, we're not allowed to use the async and the await keywords, so how we're going to achieve the same functionality here?
We learned so far that coroutines are just iterators (or better generators), so how do we await an iterator? If you thought about for loops, then you're right. Let's look at the example below:

def concurrent_unit(to: int) -> Generator[str]:
    for i in range(to):
        yield i
    return f"run for {to} times"


if __name__ == '__main__':
    loop = EventLoop(debug=True)

    def main():
        t1 = loop.create_task(concurrent_unit(2))
        t2 = loop.create_task(concurrent_unit(3))
        yield 'a'

    loop.run_until_complete(main())

# output
# <Task: Task-0, done: False>: a
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: Task-2, done: False>: 1
Enter fullscreen mode Exit fullscreen mode

As you can see t1 and t2 do not complete and that's because main() completes before them. If you look at the run_until_complete source code, you see that when main_task is done we exit the while loop, no matter if there are still undone tasks. While this is the intended behaviour, we need a way to wait for completion of specific tasks before moving on and we're gonna do that with for loops:

def concurrent_unit(to: int) -> Generator[str]:
    for i in range(to):
        yield i
    return f"run for {to} times"


if __name__ == '__main__':
    loop = EventLoop(debug=True)

    def main():
        t1 = loop.create_task(concurrent_unit(2))
        t2 = loop.create_task(concurrent_unit(3))
        for step in concurrent_unit(5):
            yield step

    loop.run_until_complete(main(), task_name='main_task')

# output
# <Task: main_task, done: False>: 0
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: main_task, done: False>: 1
# <Task: Task-1, done: False>: 1
# <Task: Task-2, done: False>: 1
# <Task: main_task, done: False>: 2
# <Task: main_task, done: False>: 3
# <Task: Task-2, done: False>: 2
# <Task: main_task, done: False>: 4
Enter fullscreen mode Exit fullscreen mode

This time all the tasks completed.
Before diving in what's wrong we the above code we have to thanks Python and generators once more: instead of using an ugly for loop to replace await, we can use the yield from syntax: yield from gen is the same as for x in gen: yield x. From now on, we'll use yield from as our await (and that's what Python does too under the hood).

    def main():
        t1 = loop.create_task(concurrent_unit(2))
        t2 = loop.create_task(concurrent_unit(3))
        yield from concurrent_unit(5)
Enter fullscreen mode Exit fullscreen mode

As I stated before the above code introduces some pitfalls. Since we can not yield from objects we can not await tasks, in fact you may have noticed that I didn't spawn a task from concurrent_unit(5). To have some consistency we have to find a way to yield from tasks.
We can write an helper function that takes a task object and keeps calling _step until it's done, but that will conflict with the event loop calling _step too. We could make Task an iterator defining __iter__ and __next__ and that will work (you can use yield from with iterators). However, generators are usually faster than iterators (I won't dive into it, if you're interested in that topic you can find useful resources searching on google) so I opted to write a new method on the task interface, a generator function just yielding back to the event loop until the task is done.

class Task:
    def __init__(
        self,
        generator: Generator[Any, Any, T],
        *,
        name: str | None = None,
        debug: bool = False,
    ) -> None:
        self._generator = generator
        self._name = name
        self._debug = debug
        self._result: T | None = None
        self._exception: Exception | None = None
        self._done = False

    def __repr__(self) -> str:
        return f"<Task: {self._name}, done: {self._done}>"

    def _step(self) -> None:
        if self._done:
            raise RuntimeError(f"{self}: Cannot step a done task")
        try:
            step = self._generator.send(None)
            if self._debug:
                print(f"{self}: {step}")
        except StopIteration as e:
            self._done = True
            self._result = e.value
        except Exception as e:
            self._done = True
            self._exception = e
            if self._debug:
                print(f"{self}: Exception: {e}")

    def result(self) -> T:
        return self._result

    def exception(self) -> Exception:
        return self._exception

    def done(self) -> bool:
        return self._done

    def wait(self) -> T:
        while not self._done:
            yield
        if self._exception:
            raise self._exception
        return self._result
Enter fullscreen mode Exit fullscreen mode

Now we can refactor our example with our new wait logic:

if __name__ == '__main__':
    loop = EventLoop(debug=True)

    def main():
        t1 = loop.create_task(concurrent_unit(2))
        t2 = loop.create_task(concurrent_unit(3))
        t3 = loop.create_task(concurrent_unit(5))
        # yield from task.wait() will either raise
        # task._exception (if it's not None) or
        # return task._result.
        # This means that exceptions do not propagate
        # until the task is awaited.
        result = yield from t3.wait()
        yield 'a'
        yield 'b'

    loop.run_until_complete(main(), task_name='main_task')

# <Task: main_task, done: False>: None
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: Task-3, done: False>: 0
# <Task: main_task, done: False>: None
# <Task: Task-1, done: False>: 1
# <Task: Task-2, done: False>: 1
# <Task: Task-3, done: False>: 1
# <Task: main_task, done: False>: None
# <Task: Task-3, done: False>: 2
# <Task: main_task, done: False>: None
# <Task: Task-2, done: False>: 2
# <Task: Task-3, done: False>: 3
# <Task: main_task, done: False>: None
# <Task: main_task, done: False>: None
# <Task: Task-3, done: False>: 4
# <Task: main_task, done: False>: None
# <Task: main_task, done: False>: a
# <Task: main_task, done: False>: b
Enter fullscreen mode Exit fullscreen mode

As you can see we are awaiting the task, yielding from its wait method. Without this method, whenever we've needed to retrieve the result of a task (either a value or an exception), we should've done it accessing the related attribute.
As Python asyncio does, we propagate exceptions via await: we don't re-raise immediately after having caught them, but when the task is awaited.

I want to mention one last thing about await in Python: a general misconception about it is that when we await a coroutine or a task we are telling the event loop to "not doing anything else until that coroutine either returns or raises", however, as you can see from the output of our last example, tasks scheduled before our yield from (and so await) statement still run interleaving with the awaited task (t3).
What await really tells to the event loop (and that's an approximation, since the event loop it's not aware of it) is: "do not go over with the current task until the task I'm awaiting on has done. In the meantime you can still run other scheduled tasks" where current task is main and the task I'm awaiting on is t3 in our current context.
Again, that sentence describes the await behaviour but it's not really true since a task can not control what the event loop does. Actually we are taking care of preventing the execution of the current task to go on before t3 completes, rather than giving instructions to the event loop.

Run blocking code

Sometimes you may need to use blocking functions (functions that can't yield back to the event loop). You can use threads to run such functions to avoid blocking the execution of the event loop and one of the most efficient ways to do it is with a threadpool.
Since we're just exploring core concepts we're not gonna implement a threadpool ourselves, but we'll just use a method to spawn a callable in a new thread. You can learn more about threadpools searching by your own or reading through the Python concurrent.futures.thread source code.

We can modify our event loop implementation a bit to handle a set of worker threads:

class EventLoop:
    def __init__(self, *, debug: bool = False) -> None:
        self._debug = debug
        self._tasks: list[Task] = []
        self._tasks_counter: int = 0
        self._workers: set[threading.Thread] = set()

    def _spawn(self, callable: Callable[..., Any]):
        thread = threading.Thread(target=callable)
        thread.start()
        self._workers.add(thread)

    ...

Enter fullscreen mode Exit fullscreen mode

The above code will work, but there are two problems with it:

  • Whenever we want to use arguments and keyword arguments we have to rely on functools.partial.
  • We need a way to retrieve the execution result.

To solve those problems we could write a class that encapsulate all the attributes and the logic we need and update EventLoop._spawn signature to match it:

class _Work:
    def __init__(
        self, fn: Callable[..., T], /, *args, **kwargs
    ) -> None:
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.result: T | None = None
        self.exception: Exception | None = None

    def run(self) -> None:
        try:
            result = self.fn(*self.args, **self.kwargs)
        except Exception as e:
            self.exception = e
        else:
            self.result = result
Enter fullscreen mode Exit fullscreen mode

You may have noticed that a pattern has emerged: we have result and exception and we could write a wait generator method like the one of Task to interoperate with non blocking code.
To be cleaner, let's put our common waiting logic in a base interface:

T = TypeVar('T')


class Waitable(ABC, Generic[T]):
    @abstractmethod
    def wait(self) -> Generator[Any, Any, T]:
        ...
Enter fullscreen mode Exit fullscreen mode

We could then make Task and _Work inherit from Waitable.
However, if you start thinking about all possible applications for that logic you may come up with a better solution. While Task being a special case, the use case of _Work may recur in the future and we should build a reusable interface for that:

class Waiter(Waitable):
    def __init__(self) -> None:
        self._result: T | None = None
        self._exception: Exception | None = None
        self._done: bool = False

    def __repr__(self) -> str:
        return f'<Waiter: done: {self.done()}>'

    def done(self) -> bool:
        return self._done

    def result(self) -> T:
        return self._result

    def exception(self) -> Exception:
        return self._exception

    def set_result(self, result: T) -> None:
        if self._done:
            raise RuntimeError('Waiter is already done')
        self._done = True
        self._result = result

    def set_exception(self, exception: Exception) -> None:
        if self._done:
            raise RuntimeError('Waiter is already done')
        self._done = True
        self._exception = exception

    def wait(self) -> Generator[Any, Any, T]:
        while not self.done():
            yield
        if self._exception:
            raise self._exception
        return self._result
Enter fullscreen mode Exit fullscreen mode

Waiter may resemble you Python Future objects.. and once more, you're right. We have defined an object that act as a placeholder for a future result (either a value or an exception) that can be set by other functions. This is also a fundamental building block for synchronization.

Let's use Waiter in the _Work class:

class _Work:
    def __init__(
        self, waiter: Waiter, fn: Callable[..., T], /, *args, **kwargs
    ) -> None:
        self.waiter = waiter
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self) -> None:
        try:
            result = self.fn(*self.args, **self.kwargs)
        except Exception as e:
            self.waiter.set_exception(e)
        else:
            self.waiter.set_result(result)
Enter fullscreen mode Exit fullscreen mode

Now update the event loop implementation:

class EventLoop:
    def __init__(self, *, debug: bool = False) -> None:
        self._debug = debug
        self._tasks: list[Task] = []
        self._tasks_counter: int = 0
        self._workers: set[threading.Thread] = set()

    def _spawn(self, work: _Work):
        thread = threading.Thread(target=work.run)
        thread.start()
        self._workers.add(thread)

    def run_in_thread(self, fn: Callable[..., T], /, *args, **kwargs) -> Waiter[T]:
        waiter = Waiter()
        work = _Work(waiter, fn, *args, **kwargs)
        self._spawn(work)
        return waiter

    ...

Enter fullscreen mode Exit fullscreen mode

Let's try it out:

if __name__ == '__main__':
    # set it to `True` to better understand the behavior
    loop = EventLoop(debug=False)

    def blockingf(i: int) -> int:
        time.sleep(1)
        return f'BLOCKING finished after {i} seconds'

    def genf(i: int) -> Generator[Any, Any, str]:
        for j in range(i):
            yield i
        return f'non-blocking finished after {i} iterations'

    def main():
        t1 = loop.create_task(genf(3))
        t2 = loop.create_task(genf(2))

        w1 = loop.run_in_thread(blockingf, 2)
        res_blocking = yield from w1.wait()

        res1 = yield from t1.wait()
        res2 = yield from t2.wait()

        print(res1, res2, res_blocking, sep='\n')

    loop.run_until_complete(main())

# output
# non-blocking finished after 3 iterations
# non-blocking finished after 2 iterations
# BLOCKING finished after 2 seconds
Enter fullscreen mode Exit fullscreen mode

We have successfully mixed blocking code with "non blocking" code.
In the next section we'll build a concurrent network service using what we have developed so far.

Socket and Selectors

At the beginning of this post I've asked for your knowledge about socket programming.
If you've ever built a TCP service with Python, you eventually got in touch with the old problem of how to handle concurrent connections.
Python socket programming HOWTO has a section about non blocking sockets and select (I recommend you to read it if you've not, go here).
We're now going to combine what we have learned today with it, using selectors which is an high level interface over select.
If you want to read more about selectors you can go through the documentation, but for the purpose of this post it's enough to understand that with selectors we can:

  • Register a socket object waiting for either read or write events, associating data to it (any object, even a callable).
  • Update the event kind or the data of a registered socket.
  • Unregister a registered socket.
  • Call selector.select to get a list of ready sockets (with other information like the event type and our associated data). We can safely assert that calling methods on socket objects returned by selector.select are not going to block.

First we need to implement methods to register/unregister sockets on the event loop:

class EventLoop:
    def __init__(self, *, debug: bool = False) -> None:
        self._debug = debug
        self._tasks: list[Task] = []
        self._tasks_counter: int = 0
        self._workers: set[threading.Thread] = set()
        self._selector = selectors.DefaultSelector()

    ...

    def _create_waiter(self) -> Waiter:
        return Waiter()

    def add_reader(self, fd: int, callback: Callable[..., None]) -> None:
        try:
            self._selector.get_key(fd)
        except KeyError:
            self._selector.register(fd, selectors.EVENT_READ, callback)

    def remove_reader(self, fd: int) -> None:
        try:
            self._selector.unregister(fd)
        except KeyError:
            pass

    def add_writer(self, fd: int, callback: Callable[..., None]) -> None:
        try:
            self._selector.get_key(fd)
        except KeyError:
            self._selector.register(fd, selectors.EVENT_WRITE, callback)

    def remove_writer(self, fd: int) -> None:
        try:
            self._selector.unregister(fd)
        except KeyError:
            pass
Enter fullscreen mode Exit fullscreen mode

selectors.DefaultSelector returns the best selector implementation for your platform and _create_waiter is just a convenience method.
Then we have methods to register sockets (one for read events and one for write events) and the same to unregister them.

With that, we can define the non-blocking methods required to build our TCP server. For the scope of this post we'll only need 3: socket.accept, socket.recv and socket.sendall.

class EventLoop:

    ...

    def _sock_recv(
        self, sock: socket.socket, nbytes: int, waiter: Waiter
    ) -> None:
        try:
            result = sock.recv(nbytes)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as e:
            waiter.set_exception(e)
        else:
            waiter.set_result(result)

    def sock_recv(
        self, sock: socket.socket, nbytes: int
    ) -> Generator[Any, Any, bytes]:
        waiter = self._create_waiter()
        self.add_reader(
            sock.fileno(),
            functools.partial(self._sock_recv, sock, nbytes, waiter),
        )
        res = yield from waiter.wait()
        return res

    def _sock_sendall(
        self, sock: socket.socket, data: bytes, waiter: Waiter
    ) -> None:
        try:
            result = sock.sendall(data)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as e:
            waiter.set_exception(e)
        else:
            waiter.set_result(result)

    def sock_sendall(
        self, sock: socket.socket, data: bytes
    ) -> Generator[Any, Any, None]:
        waiter = self._create_waiter()
        self.add_writer(
            sock.fileno(),
            functools.partial(self._sock_sendall, sock, data, waiter),
        )
        res = yield from waiter.wait()
        return res

    def _sock_accept(self, sock: socket.socket, waiter: Waiter) -> None:
        try:
            result = sock.accept()
        except (BlockingIOError, InterruptedError):
            return
        except Exception as e:
            waiter.set_exception(e)
        else:
            waiter.set_result(result)

    def sock_accept(
        self, sock: socket.socket
    ) -> Generator[Any, Any, tuple[socket.socket, Any]]:
        waiter = self._create_waiter()
        self.add_reader(
            sock.fileno(), functools.partial(self._sock_accept, sock, waiter)
        )
        res = yield from waiter.wait()
        return res

    def process_events(
        self,
        events: list[tuple[selectors.SelectorKey, type[selectors.EVENT_READ]]],
    ) -> None:
        for key, mask in events:
            fileobj, callback = key.fileobj, key.data
            callback()
            if mask & selectors.EVENT_READ:
                self.remove_reader(fileobj)
            if mask & selectors.EVENT_WRITE:
                self.remove_writer(fileobj)

    def run_until_complete(
        self,
        generator: Generator[Any, Any, T],
        *,
        task_name: str | None = None,
    ) -> T:
        main_task = self.create_task(generator, name=task_name)
        while not main_task._done:
            ready = self._selector.select(0)
            if ready:
                self.process_events(ready)
            for task in self._tasks:
                task._step()
                if task._done:
                    self._tasks.remove(task)
        if main_task._exception:
            raise main_task._exception
        return main_task._result

    def close(self) -> None:
        for thread in self._workers:
            thread.join()
        self._selector.close()
Enter fullscreen mode Exit fullscreen mode

To understand what's going on with that code, let's look at _sock_recv and sock_recv.
The former takes a socket and a Waiter object, tries to run socket.recv and stores either the result or any exception on the waiter.
The latter is a generator function that creates the waiter object and calls add_reader to register the socket and _sock_recv as a callback (we use functools.partial to bound arguments to keep it simple). Then it awaits the waiter (yielding from its wait method) and returns its result. We know that Waiter.wait just yields until the waiter has done (and that means the _sock_recv has called either set_result or set_exception).

As you can see in run_until_complete, at each iteration we get a list of ready sockets that we have registered with add_reader or add_writer. As we said before we can trust self._selector.select to give us only sockets that aren't gonna block so we run associated callbacks (like _sock_recv) with process_events.

So if we write:

...
data = yield from loop.sock_recv(sock_obj, 1024)
...
Enter fullscreen mode Exit fullscreen mode

We are doing the following:

  1. We create a new waiter and we schedule loop._sock_recv(sock_obj, 1024, waiter) to run as soon as loop._selector.select will tell us that sock_obj is ready.
  2. We await the waiter. As we've learned before, sock_recv does not move on until yield from waiter.wait() is done, but other scheduled tasks still run.
  3. At some point, during an iteration of the while loop in run_until_complete the selector will give us sock_obj and the scheduled callback (_sock_recv with the signature of step 1). process_events will run the callback that's gonna set the result on our waiter.
  4. sock_recv get the result of waiter (so it exits the yield from waiter.wait() line) and returns it.

We're now ready to build a concurrent tcp service. In the next section we'll build an echo service with what we've learned so far.

Concurrent echo service

At this point the implementation of our service should be straightforward.

loop = EventLoop()


def process_client(client: socket.socket, address: tuple[str, int]) -> None:
    print('New client:', address)
    try:
        while True:
            data = yield from loop.sock_recv(client, 1024)
            print(address, data)
            if not data:
                break
            yield from loop.sock_sendall(client, data)
    finally:
        client.close()
        print('Client closed:', address)


def main1():
    server = socket.create_server(
        ('127.0.0.1', 1234), family=socket.AF_INET, backlog=5, reuse_port=True
    )
    server.setblocking(False)
    while True:
        client, address = yield from loop.sock_accept(server)
        client.setblocking(False)
        loop.create_task(process_client(client, address))


loop.run_until_complete(main1())
Enter fullscreen mode Exit fullscreen mode

In our main function, we create a "server" socket and set it to non-blocking. We then enter a while loop and accept new connections with loop.sock_accept. Any time we get one, set it to non-blocking too and start a new task from process_client to handle it.
process_client itself, just keeps awaiting on loop.sock_recv to get data and streams it back to the client with loop.sock_sendall as long as it gets any data.

Let's run it. I'll use two terminals with telnet to connect and send data.
The output should be something like:

New client: ('127.0.0.1', 39698)
New client: ('127.0.0.1', 39700)
('127.0.0.1', 39700) b'hello world 1\r\n'
('127.0.0.1', 39698) b'hello world 2\r\n'
('127.0.0.1', 39698) b'I go now\r\n'
('127.0.0.1', 39698) b''
Client closed: ('127.0.0.1', 39698)
('127.0.0.1', 39700) b'me too\r\n'
('127.0.0.1', 39700) b''
Client closed: ('127.0.0.1', 39700)
Enter fullscreen mode Exit fullscreen mode

Conclusion

In the end we have developed a concurrent service using neither async nor await directly, but we have replaced them with a similar underlying implementation to understand the core concepts behind them.
The aim of this post wasn't to give a better performing or a cleaner implementation of Python asyncio, but to demistify async and await for asyncio beginners, re-building already existing features.
With those fundamental building blocks for concurrent programming, you can implement any kind of concurrent feature. Think about async queues. It would be as simple as:

class Queue(Generic[T]):
    def __init__(self, max_size: int = -1) -> None:
        self._max_size = max_size
        self._queue: deque[T] = deque()

    def __repr__(self) -> str:
        return f"<Queue max_size={self._max_size} size={len(self._queue)}>"

    def qsize(self) -> int:
        return len(self._queue)

    def empty(self) -> bool:
        return not self._queue

    def full(self) -> bool:
        if self._max_size < 0:
            return False
        return len(self._queue) >= self._max_size

    def put(self, item: T) -> Generator[Any, Any, None]:
        while self.full():
            yield
        self._queue.append(item)

    def get(self) -> Generator[Any, Any, T]:
        while self.empty():
            yield
        return self._queue.pop()

    def put_nowait(self, item: T) -> None:
        if self.full():
            raise RuntimeError(f"{self} is full")
        self._queue.append(item)

    def get_nowait(self) -> T:
        if self.empty():
            raise RuntimeError(f"{self} is empty")
        return self._queue.pop()
Enter fullscreen mode Exit fullscreen mode

Or what about tasks cancellation? You can challenge yourself to implement it in the most efficient way (and updated EventLoop.close to cancel remaining tasks).

I hope that this tour has been useful to you and I can't wait to read your feedback in the comments.

Top comments (1)

Collapse
 
thaile011094 profile image
Thái Lê

too good, i'm not sure about how it can distribute tasks, since i'm having some problem using the built-in function, including joblib somehow. It messed up all the link that are stored as list.