DEV Community

Cover image for Non-blocking sequential processing in Go using infinite (unbounded) buffered channel
Tarik Dahic
Tarik Dahic

Posted on • Originally published at tarikdahic.com on

7

Non-blocking sequential processing in Go using infinite (unbounded) buffered channel

While developing the backend service that will handle a lot of real-time events at the company I work for we had a requirement to queue and process events sequentially while not blocking the reader that is reading them. Representing this requirement in a diagram looks like this:

Processing requirement

After the event is queued for processing the control is given to the reader so that it could read new events and not block the reading.

In Go this type of problem cannot be solved without using channels and goroutines so we will go down that road.

Buffered and unbuffered Go channels

When creating a Go channel we can create it as a buffered or an unbuffered channel. Here is an example:

ch := make(chan int) // Unbuffered
ch := make(chan int, 3) // Buffered
Enter fullscreen mode Exit fullscreen mode

Unbuffered channels block the sending goroutine until there is a corresponding receiver ready to receive the value being sent. This means that data is guaranteed to be received in the order it was sent, and that synchronization is built into the channel. However, if we send one event as per our requirement and it starts processing, the next event will block and will have to wait for the first one to complete processing. The reading of the next event will be blocked. We don’t want that.

Buffered channels, on the other hand, can hold a limited number of values (determined by the buffer size), and will only block the sending goroutine when the buffer is full. This can allow for some additional concurrency, but requires careful consideration to avoid deadlocks and other synchronization issues. On the first thought, this could work, if we know the system demand we can provide a buffer big enough so that we don’t worry that it will get blocked.

But, we know better and we want to eliminate all the ifs. That is why we will build a Go buffered channel with infinite (unbounded) capacity.

Go buffered channel with infinite capacity

Let’s start with a name. I named my package and the data structure executor. There are a lot of use cases for queuing data so we will need to use Go generics so that every type of data can be passed into the executor. Go generics are available from Go version 1.18 and up. We also need to define the executor API, how it will be used when it is used in Go software. I decided to pass the function to the executor that will be called within when some data is ready to be processed.

To make it easy for ourselves we can define a new type that will describe this function:

type ExecHandler[T any] func(T)
Enter fullscreen mode Exit fullscreen mode

Since we will be queuing something, preferably, on another goroutine, we will need a pair of channels to send data from that goroutine and to read data for processing. A data structure encapsulating all of the mentioned above could be defined like:

type Executor[T any] struct {
    reader chan T
    writer chan T
    buffer []T
    execHandler ExecHandler[T]
}
Enter fullscreen mode Exit fullscreen mode

Buffer property will hold queued elements that are waiting to be processed.

We also need to provide a factory function for executor so that everything is properly initialised:

func New[T any](execHandler ExecHandler[T]) *Executor[T] {
    e := &Executor[T]{
        reader: make(chan T),
        writer: make(chan T),
        buffer: make([]T, 0),
        execHandler: execHandler,
    }

    go e.run()

    return e
}
Enter fullscreen mode Exit fullscreen mode

New function constructs and returns the executor object. It also spawns a new goroutine and calls a run method on executor which starts listening for new elements that will be queued for processing.

func (e *Executor[T]) run() {
    go e.listenForReading()
    for {
        if len(e.buffer) > 0 {
            select {
            case e.reader <- e.buffer[0]:
                e.buffer = e.buffer[1:]
            case data := <-e.writer:
                e.buffer = append(e.buffer, data)
            }
        } else {
            data := <-e.writer
            e.buffer = append(e.buffer, data)
        }
    }
}

func (e *Executor[T]) listenForReading() {
    for data := range e.reader {
        e.execHandler(data)
    }
}
Enter fullscreen mode Exit fullscreen mode

There’s a lot happening in the run method and we will now break it down. In the first line we spawn a new goroutine that will be doing processing of queued elements. By processing I mean that the exec handler will be called with that element.

Next, we have an infinite loop that queues and sends data to processing. If the buffer of queued elements is empty we wait for a new element from writer channel. When we get a new element it is stored in the buffer and the new iteration of the loop begins.

If the buffer is not empty, we stop on a select statement where two things can happen:

  • If reader channel is ready, element can be dispatched to processing and it will be removed from the buffer.
  • In case a new element is waiting to be queued it will be added to the buffer.

Only thing left is to add an exported method to executor for queuing elements:

func (e *Executor[T]) Dispatch(data T) {
    e.writer <- data
}
Enter fullscreen mode Exit fullscreen mode

That would be it, we now have a functioning infinite (unbounded) channel that processes data sequentially.

If you want to see the entire file I’ve created a gist on GitHub so you can visit this link or copy the code from the GitHub embed below.

package executor
type ExecHandler[T any] func(T)
type Executor[T any] struct {
reader chan T
writer chan T
buffer []T
execHandler ExecHandler[T]
}
func New[T any](execHandler ExecHandler[T]) *Executor[T] {
e := &Executor[T]{
reader: make(chan T),
writer: make(chan T),
buffer: make([]T, 0),
execHandler: execHandler,
}
go e.run()
return e
}
func (e *Executor[T]) Dispatch(data T) {
e.writer <- data
}
func (e *Executor[T]) run() {
go e.listenForReading()
for {
if len(e.buffer) > 0 {
select {
case e.reader <- e.buffer[0]:
e.buffer = e.buffer[1:]
case data := <-e.writer:
e.buffer = append(e.buffer, data)
}
} else {
data := <-e.writer
e.buffer = append(e.buffer, data)
}
}
}
func (e *Executor[T]) listenForReading() {
for data := range e.reader {
e.execHandler(data)
}
}
view raw executor.go hosted with ❤ by GitHub

There are a lot of solutions to this problem so if you have one that differs from the one in the article please discuss it below. I would love to see what else is possible. If you are working in other technologies or in Go, I encourage you to play with stuff like this to get better understanding of the language and its concurrency mechanisms.

Imagine monitoring actually built for developers

Billboard image

Join Vercel, CrowdStrike, and thousands of other teams that trust Checkly to streamline monitor creation and configuration with Monitoring as Code.

Start Monitoring

Top comments (6)

Collapse
 
gborbollal profile image
Gerardo Borbolla Luna

Nice and neat approach!
I had a similar problem and I ended up implementing a "cascade" of buffered channels as outlined here.
I'd love to see your comments on that one.

Collapse
 
daholino profile image
Tarik Dahic

Very interesting approach, I haven't seen that one around. I like it :)

Collapse
 
gjrivero profile image
Guillermo Rivero

Excellent!

Collapse
 
daholino profile image
Tarik Dahic

Thanks!

Collapse
 
sattellite profile image
Angry Unicorn

Good solution for task queueing

Collapse
 
daholino profile image
Tarik Dahic

Thanks!

Image of AssemblyAI

Automatic Speech Recognition with AssemblyAI

Experience near-human accuracy, low-latency performance, and advanced Speech AI capabilities with AssemblyAI's Speech-to-Text API. Sign up today and get $50 in API credit. No credit card required.

Try the API

👋 Kindness is contagious

Discover a treasure trove of wisdom within this insightful piece, highly respected in the nurturing DEV Community enviroment. Developers, whether novice or expert, are encouraged to participate and add to our shared knowledge basin.

A simple "thank you" can illuminate someone's day. Express your appreciation in the comments section!

On DEV, sharing ideas smoothens our journey and strengthens our community ties. Learn something useful? Offering a quick thanks to the author is deeply appreciated.

Okay