While developing the backend service that will handle a lot of real-time events at the company I work for we had a requirement to queue and process events sequentially while not blocking the reader that is reading them. Representing this requirement in a diagram looks like this:
After the event is queued for processing the control is given to the reader so that it could read new events and not block the reading.
In Go this type of problem cannot be solved without using channels and goroutines so we will go down that road.
Buffered and unbuffered Go channels
When creating a Go channel we can create it as a buffered or an unbuffered channel. Here is an example:
ch := make(chan int) // Unbuffered
ch := make(chan int, 3) // Buffered
Unbuffered channels block the sending goroutine until there is a corresponding receiver ready to receive the value being sent. This means that data is guaranteed to be received in the order it was sent, and that synchronization is built into the channel. However, if we send one event as per our requirement and it starts processing, the next event will block and will have to wait for the first one to complete processing. The reading of the next event will be blocked. We don’t want that.
Buffered channels, on the other hand, can hold a limited number of values (determined by the buffer size), and will only block the sending goroutine when the buffer is full. This can allow for some additional concurrency, but requires careful consideration to avoid deadlocks and other synchronization issues. On the first thought, this could work, if we know the system demand we can provide a buffer big enough so that we don’t worry that it will get blocked.
But, we know better and we want to eliminate all the ifs. That is why we will build a Go buffered channel with infinite (unbounded) capacity.
Go buffered channel with infinite capacity
Let’s start with a name. I named my package and the data structure executor
. There are a lot of use cases for queuing data so we will need to use Go generics so that every type of data can be passed into the executor
. Go generics are available from Go version 1.18 and up. We also need to define the executor
API, how it will be used when it is used in Go software. I decided to pass the function to the executor
that will be called within when some data is ready to be processed.
To make it easy for ourselves we can define a new type that will describe this function:
type ExecHandler[T any] func(T)
Since we will be queuing something, preferably, on another goroutine, we will need a pair of channels to send data from that goroutine and to read data for processing. A data structure encapsulating all of the mentioned above could be defined like:
type Executor[T any] struct {
reader chan T
writer chan T
buffer []T
execHandler ExecHandler[T]
}
Buffer property will hold queued elements that are waiting to be processed.
We also need to provide a factory function for executor
so that everything is properly initialised:
func New[T any](execHandler ExecHandler[T]) *Executor[T] {
e := &Executor[T]{
reader: make(chan T),
writer: make(chan T),
buffer: make([]T, 0),
execHandler: execHandler,
}
go e.run()
return e
}
New
function constructs and returns the executor
object. It also spawns a new goroutine and calls a run
method on executor
which starts listening for new elements that will be queued for processing.
func (e *Executor[T]) run() {
go e.listenForReading()
for {
if len(e.buffer) > 0 {
select {
case e.reader <- e.buffer[0]:
e.buffer = e.buffer[1:]
case data := <-e.writer:
e.buffer = append(e.buffer, data)
}
} else {
data := <-e.writer
e.buffer = append(e.buffer, data)
}
}
}
func (e *Executor[T]) listenForReading() {
for data := range e.reader {
e.execHandler(data)
}
}
There’s a lot happening in the run
method and we will now break it down. In the first line we spawn a new goroutine that will be doing processing of queued elements. By processing I mean that the exec handler will be called with that element.
Next, we have an infinite loop that queues and sends data to processing. If the buffer of queued elements is empty we wait for a new element from writer
channel. When we get a new element it is stored in the buffer
and the new iteration of the loop begins.
If the buffer is not empty, we stop on a select statement where two things can happen:
- If
reader
channel is ready, element can be dispatched to processing and it will be removed from thebuffer
. - In case a new element is waiting to be queued it will be added to the buffer.
Only thing left is to add an exported method to executor
for queuing elements:
func (e *Executor[T]) Dispatch(data T) {
e.writer <- data
}
That would be it, we now have a functioning infinite (unbounded) channel that processes data sequentially.
If you want to see the entire file I’ve created a gist on GitHub so you can visit this link or copy the code from the GitHub embed below.
package executor | |
type ExecHandler[T any] func(T) | |
type Executor[T any] struct { | |
reader chan T | |
writer chan T | |
buffer []T | |
execHandler ExecHandler[T] | |
} | |
func New[T any](execHandler ExecHandler[T]) *Executor[T] { | |
e := &Executor[T]{ | |
reader: make(chan T), | |
writer: make(chan T), | |
buffer: make([]T, 0), | |
execHandler: execHandler, | |
} | |
go e.run() | |
return e | |
} | |
func (e *Executor[T]) Dispatch(data T) { | |
e.writer <- data | |
} | |
func (e *Executor[T]) run() { | |
go e.listenForReading() | |
for { | |
if len(e.buffer) > 0 { | |
select { | |
case e.reader <- e.buffer[0]: | |
e.buffer = e.buffer[1:] | |
case data := <-e.writer: | |
e.buffer = append(e.buffer, data) | |
} | |
} else { | |
data := <-e.writer | |
e.buffer = append(e.buffer, data) | |
} | |
} | |
} | |
func (e *Executor[T]) listenForReading() { | |
for data := range e.reader { | |
e.execHandler(data) | |
} | |
} |
There are a lot of solutions to this problem so if you have one that differs from the one in the article please discuss it below. I would love to see what else is possible. If you are working in other technologies or in Go, I encourage you to play with stuff like this to get better understanding of the language and its concurrency mechanisms.
Top comments (6)
Nice and neat approach!
I had a similar problem and I ended up implementing a "cascade" of buffered channels as outlined here.
I'd love to see your comments on that one.
Very interesting approach, I haven't seen that one around. I like it :)
Excellent!
Thanks!
Good solution for task queueing
Thanks!