DEV Community

Vandy in the Vandyverse
Vandy in the Vandyverse

Posted on

Creating a queue system using only Python 3

I was looking for some practice, but wasn't feeling like solving coding challenges. I wanted instead to build something and learn/remember stuff on my way, on a experimental fashion.
How can one do such thing, you may ask. What about picking something from your daily work and put an extreme constraint on it?
That said I came up with the task: “We need a queue, but nothing beyond python and files is allowed. Also, no third party libraries".

Disclaimer: This is meant as an exercise. The code here is by no means production-ready or close to it.

Move your big toe

The problem and constraints are set. But now, what is the first step? After being stuck for a while, I came up with it: "Persist the messages in a binary file, taking care to compress them"

First version and the problem of atomic writes

How it works: We reserve the first 50 bytes on the beginning for the indexes. They're responsible for keeping track of what messages were already consumed and which are next on line.

At the same time, we're avoiding filling up the whole memory by reading the file in chunks

Every time we call self.q.write, we’re writing data to the python’s internal buffer, without really writing the file, avoiding system calls that could make operations slow.

So, in order to really write the file in the disk, we call flush(), right? Well, more or less. The flush method copies the file internal buffer (which is 8192 bytes by the way, but it can use the file preferred block size when defined) to the operating system file buffer, what even so, does not guarantee that the data will be written to the disk.

That said, to really write the file in the disk, we call flush(), right? Well, more or less. The flush method copies the file internal buffer (which is 8192 bytes by default, but it can be changed) to the operating system file buffer, what even so, does not guarantee the data will be written to the disk.

The function call os.fsync() at line 82, writes all the buffers from a file on the disk. If you want to know a little bit more about it, see the module docs and the system call docs.

Yet, this approach has a few problems. In case of system failure (like a power outage) we might have an incomplete file write and/or index update. This happens because there is no atomicity in the operations (more on this later). Also, we're not deleting the consumed messages, bloating the file during usage. Finally, the file is not being locked for exclusive access by an instance of Queue

This answer on StackOverflow says atomicity can be achieved by making a copy of the file, writing on it and then replacing the original. Being this also the common approach, because according to the POSIX standard, renaming a file is an atomic operation. Click here if you want to know more about it.

Seems great, but also quite costly approach for big files. Thus, we can keep a separate index file, with the position of the last successfully written bytes. Since this will be a tiny file, perhaps might not even need to use the rename operation?

File systems, blocks and sectors

File systems usually store data in blocks of 4096 bytes. This means that even if you create a file having 200 bytes of size, it will fit inside of a 4096 bytes block.
Blocks are an abstraction of the disk sectors, where each one of them has 512 bytes usually. So, a block can span many sectors, and a sector may be part of more than one block. According to an interview with Dr. Stephen Twiddle, even in a power outage scenario, disks have energy to finish writing a sector:

“If you start a write operation to a disk, then even if the power fails in the middle of that sector write, the disk has enough power available, and it can actually steal power from the rotational energy of the spindle; it has enough power to complete the write of the sector that's being written right now. In all cases, the disks make that guarantee.”

So, even fitting our index in a single disk sector, we’re only writing in blocks. That said, we cannot guarantee that the whole block will be written in a case of a system failure.

Solving Atomic writes

Well, looks like using rename() is the way to go. The index file is small, so copying -> renaming won’t be as costly. Let’s see the code.

We can notice some changes here. Now we keep an index file, where the os.rename() operation (line 107). Also it is worth to remember o.rename uses the system call rename, which we said before it is atomic. That said, it is nice to have this as a transaction commit point.

Another nice thing, this change makes the queue more robust, since it not only points to the next message to be consumed, but also to the last successfully written byte. This means if a write wasn't finished, the index will continue to point to valid positions in the file, so we’ll always have a contiguous block of messages.

More safety has its price. The chart below show operations per second:

ops per second

Enqueue / dequeue

Since dequeuing updates only the index file, it allows more operations per second. So, how can we improve that? Perhaps avoiding a file write per operation might help. Let’s see bellow.

Buffers

A buffer is a region in memory where we store data for reads and writes. Remember the Youtube gray bar when you’re watching a video? That’s one of the many applications of a buffer. Unlike a cache, the data will not be available once consumed.

Here we need two buffers. One will perform reads (consuming messages) and other for writes (adding messages). Also, the messages inside the write buffer should be accessible, in case they’re requested before being persisted in the file. Let’s see the code:

This is not a functional implementation. That's because it’ll only update the files once the buffers get filled, meaning we’ll lose some messages if a system error happens in-between. But, it serves as demonstration of how much performance we can have if we avoid writing a file constantly. Let’s see:

performance with buffers

Enqueue/Dequeue

This is a huge performance gain compared with the previous versions. However we end up with some reliability risks. There are strategies to mitigate this problem, making this queue both reliable and fast, but implementing them has enough topics to write a book!

Problems, Improvements that I'll never make and final thoughts

  • This was written as a Python library, but it should run as a service, accepting connections via socket.
  • Implementing async IO might improve performance, but the code would be much more complex, since the messages order should be guaranteed. The most popular approach on that is via Thread pools, and if you want to know more, see this project.
  • Messages consumed are not erased, causing a file bloat. Perhaps something in the lines of PostgreSQL's vacuum might be a solution?
  • Buffers are flushed to the disk only when they reach a limit, instead of also having a delay to do so.
  • We don’t have a message size limit, which can cause us some troubles.
  • The queue files are not being locked for usage, meaning we may have two processes using the same queue at a time. Causing all the types of trouble (corrupted reads and writes, for instance)

Tackling this problem was a lot of fun, even without ending up with a production ready solution. However, it can be a very fun and educational experience to try to reinvent things we use in our everyday work. Hope you had fun reading this and get some ideas of your own to make your own experiments.

Top comments (0)