DEV Community

Cover image for Why Queues for Streaming?
Karel Vanden Bussche
Karel Vanden Bussche

Posted on

Why Queues for Streaming?

Big Data brings Big Headaches

In the Big Data space, a lot can go wrong, in multiple places, at multiple times during the day. Everyone wants to avoid the midnight automated alerting phone calls, quick fixing of production on a weekend day or even worse, the monster under the bed, data loss. Data intensive applications in general require a lot of resilience to different types of issues:

  • Network issues (API requests timing out, ...)
  • Code issues
  • Transient issues (Database being overloaded, ...)

Issues like the ones above make it hard to keep a consistent state within applications. When data quality is a hard requirement, this becomes a big problem during processing. Add the big to this data quality and costs quickly explode when doing unnecessary (re)processing.

To make sure we don't lose any progress between units of computation, persisting the partial result is a common strategy within data pipelines. This helps us store the previously computed data, even though we might be hitting certain issues. Work can then be quickly picked up from the last successful step.

This principle works for both batch pipelines & streaming pipelines. In batch pipelines we can store most of the data in bulk in a database, blob store or other persistent storage. In streaming pipelines, we mostly use queues for lightweight messages or events.

In the following article, I'll go into more depth into the benefits & downsides of using queues to increase the resilience of your pipelines. First, we'll define some terms to be used further in the article regarding Tasks & Workloads. Next, the paradigm of queues will be explained with regards to streaming Workloads. A short example is given on how you could use queues to their full potential in your own project. Finally, we'll briefly discuss a few common implementations of queues, bucket queues, priority queues & Google Cloud PubSub.

Tasks and Workloads

Within the data engineering field, we can split most workloads in distinct steps. Each of these steps represents a unit of computation that can't be split into any further meaningful subtasks.

A great example of such a "meaningful" subtask is the ETL definition. Each step in the classic Extract-Transform-Load pipeline cannot be split into any smaller subtask without having significant overhead. You could split it further into more granular computations, but persisting the values to a persistent storage from the in-memory model, would outweigh the benefits in cost.

Each of these tasks defines its own input & output. The implementation of the task defines the way the input is transformed into the output.
The input will in most cases be a value or file. The output can be a value or file as well. Lastly, one or more side-effects can happen in a task. This can be a write to a database, a request to an external service, ...

Combining a chain of tasks is called a workload. A workload defines a directed, acyclic graph of tasks to be run after each other. For the sake of this article, we'll also call this a pipeline, being a chain of individual tasks.

I borrowed this terminology from Apache Airflow, to use something that is known by most data engineers & translates well to the following paragraphs on queues.

Below a Directed-Acyclic Graph is shown. All tasks trigger a subsequent task & no cycles are present in the graph. The directed nature is required to know which task's input depends on which task's output. The cycles are not allowed as otherwise your workflow would never end. There do exist use cases of this, for example game loops, but these are not within the scope of this article.

A Direct-Acyclic Graph; All tasks trigger a subsequent task; No cycles are present in the graph

Of course, each task can go horrendously wrong. For example, B fails in the above example, thus D is not computed. The challenge then is, how do we, as efficiently as possible, fix the entire pipeline...

If node B fails, we cannot run node D anymore & we need to fix node B first

Queues, the underrated time manipulators

Let's take a small step back. We know about the persisting between steps & we know we don't want data loss. But how will we in practice apply this?

For streaming application, we mainly use queues for this. These simple constructs are like a dynamic window over the theoretically infinite number of incoming messages, pushed by publishers. They dilate time until a consumer starts the clock again, each second in this example being a message coming in.

A description of a simple FIFO queue data structure

Most people understand the core concepts of queues, but there are some implicit practicalities that are interesting to explore. If we look at this figure, a few benefits can be inferred:

  1. Data isn't lost until the consumer dequeues a message. As such, a queue is a type of persistent storage, even if "persistent" is only a short amount of time.
  2. Data ordering is kept. This can be necessary when you get events that need to follow a logical ordering or older messages depend on newer messages. An example could be writes to a DB, if we write A = 10 & A = 15 afterwards, we want the latest value to be written last & be the final value.
  3. Queues can be used to disconnect consumers & producers from each other & have them loosely connected. As a practical example, imagine that your publisher is implemented, but your consumer is still in review. When you put a queue between them, you don't have any issue, as your data will be persisted in the queue as long as it doesn't exceed the window size. This helps alleviate most of the transient issues, for example when your consumer's machine is suddenly evicted, which then no longer blocks your publisher from pushing messages.
  4. Resilience against code issues in the consumer can easily be remedied by wrapping your entire logic in a try-except. When you encounter an error, you can act as a publisher & enqueue the message you dequeued to the queue again. The main downside of this is that the in-order property of the FIFO queue is lost.

Be aware that no data structure is truly infinite. The same limitation applies on queues. The window size is the upper limit of the amount of entries, or maximum amount of bytes depending on the implementation, a queue can hold. The decoupling thus only holds until the queue overflows, at which point data loss is still happening. Make sure you build for this & slightly overscale your consumers to make sure you can actually finish for all elements of your queue.

Benefits of the Task model

Now that we know some attributes of queues, we can link back to the Task & Workload model. Remember that a workload is a chain of Tasks linked to each other by a directed edge. For batch operations, it's easy to persist your data & trigger the next step. This is possible because a main attribute of batch is that the stream is finite & we thus know when to trigger the next step. For streaming, this attribute doesn't hold. All streams are theoretically infinite.

We can reframe our way of thinking regarding streams. Each element of a stream can be seen as a mini-batch. If we take this a step further, we can translate the tasks in the batch example to the same task, but done on a mini-batch. Now we actually do know when a Task is done for streaming & we can trigger the next step.

To put this in practice, queues are the ideal trigger. A few things make it perfect for this use case:

  1. Decoupling makes it easy to alleviate data loss when one of the multiple consumer machines is suddenly unresponsive.
  2. If the consumers cannot keep up, no data loss is happening (until the window size is hit).
  3. The queue size & oldest message age exposes a lot of information on issues that might be happening regarding scaling or evicted machines. This helps with observability of problems I wrote about here.

When applying queues & mini-batches to the Tasks & Workload model, we get the figure below. Each task works on a mini-batch, represented as a box in the queue. For brevity, if the queue was fully successful, no boxes are drawn on the figure below. Note that individual tasks can still fail, impacting the final workload. Data engineers should make sure the fallback implementation follows the application's needs.

Workload & Task example for streaming workloads

Streaming ETL

Every data engineer has heard about ETL. When thinking about ETL, we mainly think about large data sets we need to transform & persist in some database. Batch processing works splendid for these types of behaviour & business needs.

Sometimes realtime data is required. An example could be a weather application receiving data from multiple sensors all over the country. People will not want to wait an hour to know it's raining one village over. In this case, realtime or mini-batch processing makes more sense.

Realtime requires us to not work in batches, but mini-batches that align with our update frequency needs. In the case of the weather app, updates every 5 minutes would be good enough, while for certain applications, realtime means sub-second. As previously explained, mini-batches & streaming share a lot of characteristics.

A possible implementation of the weather app could look like the figure given below. Each of the steps in E-T-L has its own queue, for their own type of resiliency.

Definition of the weather app use case

First of all, data needs to be collected. For the sake of making the example practical, we're going to collect data for a weather app explained above. We have a lot of sensor spread over the entire country. Each of the sensors outputs a steady amount of data points.

The first step in our ETL pipeline is the Extraction step. A possible solution for this is a small cloud function that takes the data, inserts in some kind of database or blob storage (to prevent data loss) & sends the location of the persisted blob of data as a message to the output queue of the extract step. Let's call this queue the E-T queue.

Secondly, the raw data goes to the Transform step. This step will then add some extra computed columns, for example compute the volume of the precipitation. If we want to apply (pre)aggregations in any way, this would also be the place to do this. The output is a message containing the single processed value. For future reference, this queue would be the T-L queue.

Finally, the Load step will persist the value that came in. As this step is very dependent on the database load & locking behaviour, some messages might go faster than others. After this step, the data is persisted & can be used by the application.

Note that the faster this pipeline completes, the faster data is available in the application. As such, we prefer to have regularly updates & thus a low latency between Extract & Load steps.

The 2 queues we discussed make use of different benefits of queues. First, the E-T pipeline has the following benefits:

  1. As transform steps are the most code-heavy implementations, the previously discussed retry mechanism can be applied here to build resilience against implementation issues. Do note that if we use this mechanism, we need to have idempotent implementations. If ordering is required, this retry mechanism can't be used in its simplest form.
  2. Secondly, observability is key for transformation steps. Having a good idea about queue size & queue length will give us the necessary information to throw alerts & creating monitoring dashboard. We can also use this information to help with scaling behaviour, as most of these transform steps are stateless processes & can be scaled based on current workloads.
  3. The decoupled nature of publisher & consumer is very important here. We don't want to drop messages if the consumer is down. This would otherwise mean data loss for our application, which has direct business impact.

The second queue we introduced, is the T-L queue. This queue is mainly constrained by the database load. The queue helps with a few things here:

  1. We are unsure if the producers outputs as fast as the consumer can persist messages. Here the decoupling makes sure that this does not block future messages from being persisted.
  2. As we might hit transaction aborts or database connectivity issues, we want to be able to still insert these values. As such, the retry mechanism discussed above is a must-have for this behaviour.

I hope this example made it clear that queues can be used for a number of benefits. Mainly the persistent nature, the decoupling & ordering (in some cases) are the attributes which just make sense in the workloads/pipelines & task context.

Other interesting queues

As the FIFO queue is not the only one, let me give you a few other examples of interesting queues & what they could be used for.

The first to tackle is the bucketed queue shown below. This is basically multiple queues in one. This might be handy when you know you need for example one value of each type. In case of the weather app, this might be a sensor value for pressure, temperature & humidity, to combine them into a single data point later on.

Image description

A more specialised queue based on the bucket example is the priority queue. This queue is specifically made for when certain messages should be processed with a higher priority. Main use cases are when different priorities are also present in your workload or tasks. Imagine the weather app case & we have extreme weather events coming in. These events should immediately be visible in the app without delay. In such a case, a priority queue could give these the highest priority & push them first out of the queue to consumers.

Image description

Finally, a message queue I use a lot, is Google Cloud PubSub. This queue has a few interesting attributes.

First, it doesn't send messages to the subscriber, but "leases" the messages to the subscriber. What is special about leasing is that, if the message is not acknowledged, the message is returned to the queue. The acknowledgement happens when a subscribers sends a message to the queue at the end of its processing time communicating that the message has been processed correctly. This helps implementing the retry-mechanism discussed frequently in the previous paragraphs. Observe that here, alike the simple retry-mechanism, operations need to be idempotent or transactional, as otherwise you might have duplicate entries or other data anomalies.

Note that this also means that ordering is harder to implement within these types of queues. To actually implement this, bucket queues are used. Locks are used to make sure you don't receive the second value in the bucket before the first is acknowledged.

Secondly, it is not a 1-to-1 queue, but a broadcast queue. Each message on the topic is sent to n subscriptions linked to the topic. This means that it's easy to have 2 outgoing edges in your DAG, without needing to push 2 messages yourself and thus possibly failing on one or the other. In our example application, this could be persisting the transformed messages to both a database used by the app & an analytical database used by researchers.

Lastly, the PubSub queue is elastic. This means that, where previously I mentioned that window size was a hard limit before data loss, this is no longer the case, as the capacity scales horizontally & dynamically based on the needs of the queue. This is particularly useful when you have large spikes of messages coming in suddenly. In these cases, PubSub will scale to the necessary capacity, making sure you don't have any data loss. Do note that even here there are limits, as PubSub then limits based on time on the queue. Secondly, you don't want to keep billions of messages on the queue too long, as they do have a cost associated with them.

Summary

In this article, we've started with a few types of issues we might have during our processing steps. Transient issues like network problems, or code issues can have a lasting impact on your data. We've shown that data, especially big data, can have multiple types of problems.

First, we tried to define what a Task is, to help us with splitting larger processes in smaller steps. We defined Workloads or Pipelines as a chain of Tasks to be ran one-by-one. The issues when a node in the chain fails were easily spotted.

We've defined that streaming has a slightly different definition & requirements than batch, but that we can still think about streams as mini-batches. The benefits of queues in this context were shown, mainly the decoupling, persistent nature and ordering within queues help with keeping the issues between Tasks in check.

Next, we've tried to put this into the context of a simple weather application, showing the use cases of queues in an streaming ETL workload.

Finally, we've explored a few other interesting queues. The bucketed queue, which defines buckets of messages to be pulled. Secondly, the priority queue that hides the buckets & returns always the highest priority messages. Finally, the managed queue of Google, PubSub.

I hope this article gave some insight on why queues make sense for different behaviours. The practical application hopefully gave you some insight in actual cases where queues shine. Lastly, I hope this article made you interested in learning more about this data structure, as they are indeed amazing tools in the repertoire of a data engineer.

Thanks for reading!

Top comments (0)