DEV Community

Shalvah
Shalvah

Posted on • Updated on • Originally published at blog.shalvah.me

Building a task queue, Part 1

This post is part 1 of a series where I build a task queue system. Other parts:


Motivation

Some time ago, I came across a post suggesting that engineering teams should build their own task queues. I don't agree, but thinking about it made me realize that task queues are actually a great learning project, especially if you're learning a new language or stuck in the "advanced beginner" spot. Nearly every app that goes to production needs one. And there are many different challenges to tackle and different implementations to try out. Plus, knowing what goes on within a task qeue makes you utilise your existing quees better.

I realized I could do with some challenges in these areas, so I'm going to take my own advice and build my own queueing system. This article aims to be sort of a journal of what I'm doing and learning.

Thoughts on queues

Types

I'm very far from being a queue expert, but I know that there are different types of queues. I know there are job queues and task queues (which might be the same thing?) and message queues. The idea is that job/task queues (such as Ruby's Sidekiq and Resque) are for pushing tasks you need done, and one or more workers pick them from the queue and execute them. A message queue (eg RabbitMQ, Kafka), by comparison, is for publishing messages, which can then be consumed (one or more times) by interested subscribers.

it isn't strictly separated; people often use message queues as job queues too, and vice versa. But I'll be building a task queue here. I think a message queue is another interesting challenge, though.

Parts

I like to think of queue systems as having two main parts: the infrastructure and the interface. The infrastructure is what powers the queue, and is responsible for making sure your jobs run as you wish. The interface is how your application communicates with the infrastructure (for instance, to queue jobs). There could also be a web UI for viewing information about your queues, but I'm ignoring that for now.

A queuing system can come with one or both parts, and they can have different degrees of complexity. For example:

  • Beanstalkd, RabbitMQ, and Kafka are mostly used as external infrastructure. You don't have to install Beanstalkd into your app, just start the process and install (or write) a library to talk to it.
  • Rails' ActiveRecord is an interface to different queuing backends.
  • Sidekiq and Delayed Job are Ruby tools that are installed as part of your app. They come with their own interface, but you can also use them with other interfaces, such as Rails' ActiveJob.
  • The Node.js libraries BullMQ and Bee Queue are also infrastructure and interface. However, you typically have to use their interface.
  • Laravel's queuing system is an interface that comes with its own infrastructure, and its infra gives you different queuing backends as options, from database- and Redis-backed ones to services like Beanstalkd or Amazon Simple Queue Service (SQS).

Architecture

There are often two major architecture decisions:

  1. Where jobs are stored (and how long, and when)
  2. How jobs are executed

Two common options for job storage are a database and Redis. Redis is nice because it allows fast access, and you can easily retrieve new jobs without having to poll at intervals (for example, by using blocking commands). Databases are nice because they're simpler and allow a wider variety of queries, which is useful if you intend to store jobs or support querying them through the UI.

I think Redis is generally superior for this, especially for high-activity systems. With a database, the more jobs you insert, the longer your queries take. And even when you delete old records, they aren't necessarily immediately deleted on disk.

As for execution, most in-app queue systems seem to follow this basic process:

  1. you push a job to the queue by saving it to the datastore
  2. a process that's always listening (the worker) fetches it from the datastore and executes it

But there's no rule that we have to do this. For instance, a basic queue system could be something like this:

  • Put the code for your job in a separate file, say send_welcome_email.rb
# send_welcome_email.rb

def send_welcome_email(user_id)
  # ...
end

input = ARGV
send_welcome_email(*input)
Enter fullscreen mode Exit fullscreen mode
  • Run the script in a separate process and don't wait for the result
# in your controller

user = ...
Process.spawn("ruby send_welcome_email.rb #{user.id}")
Enter fullscreen mode Exit fullscreen mode

But this isn't a robust system. For starters, it takes time to start a process, we have no error handling or management capabilities, and we might end up overloading our machine with too many processes.

Using a worker makes things better:

  • The worker is already active and listening, so there is no startup overhead and it can react to new jobs quickly.
  • You can start multiple workers to improve concurrency (e.g. 1 process per CPU core).
  • Jobs are pushed to a store, which is better for reliability

In practice, though, there are many different variations of this:

  • The worker process could itself spawn multiple threads for more concurrency (Sidekiq, GoodJob). This means that your jobs must be thread-safe.
  • The worker process could fork itself into a new process for each job (Resque).
  • There could be a "master" worker, which serves as a coordinator and manager of the others. It can start or stop processes depending on your configuration. It can monitor the health of processes, kill those that go past a memory limit or execution time, and start new ones. (Sidekiq Enterprise)
  • You might not talk to the datastore directly; instead, the master process has its internal storage, and merely exposes an API (such as localhost:2946). To enqueue a job, you send it as a request to that endpoint. This is typically used by external infra like Beanstalkd.

What I'm building

Architecture

I'll be building something inspired by Sidekiq. I think its architecture is powerful and its interface is simple. In my case, this means:

  • Data store: Redis (but I'll start out with a database implementation). Sidekiq doesn't store completed jobs, but I'd like to store them for a limited time
  • Executors: worker processes which spawn threads. I do like the "master" model, though, so I'll see if I can give that a try too.

Features

I did some research to find out the most common and useful queue features, and I've highlighted some I think are a good start:

  • dispatching jobs (with a delay, on specific queue, in bulk, on a schedule)
  • giving queues names and priorities
  • telling a worker which queues to process
  • chaining jobs
  • splitting a job into batches
  • error handling
  • retry policies
  • job callbacks/middleware
  • graceful shutdown of workers
  • testing helpers
  • web UI
  • metrics

Setup

I've created a dummy Sinatra app with a route that queues jobs:

get "/queue/:count?" do
  # Enqueue jobs...
  "Queued #{params[:count] || 1} jobs"
end
Enter fullscreen mode Exit fullscreen mode

Next up, configuring the database using the Sequel query builder.

## lib/db.rb
require "sequel"

DB = Sequel.sqlite(File.join(__dir__, '../database.db'))
Enter fullscreen mode Exit fullscreen mode
# lib/db_migrate.rb
DB.drop_table? :jobs # drop if exists
DB.create_table :jobs do
  #
end
Enter fullscreen mode Exit fullscreen mode

Nothing serious in the migration script yet; it's just a stub for when I'm ready to run migrations.

Okay, we're ready. Let's do some queueing!

Job interface

Most job libraries have you create a job class to contain the logic you want to execute. Then, when queueing, you either:

  • pass an instance of the job to the library, which is then serialized, stored the job, and unserialized when it's time to execute, or
  • pass the job arguments to the library, and it stores those, then creates an instance of the job when it's ready to execute it.

I'll go with the second, because I don't have to worry about any gotchas in serialization.

Here's what I want queuing a job to look like:

DoSomeStuff.dispatch(args, job_options)

# Example:
SendWelcomeEmail.dispatch(user.id, wait: 2.days)
Enter fullscreen mode Exit fullscreen mode

args will be passed to the job when executing it, while job_options are meant for the executor. Here the caller can configure things like a delay, queue name, or priority. Some job libraries use additional methods for this (ActiveJob would be DoSomeStuff.set(options).perform(arguments), while Sidekiq would be DoSomeStuff.perform_after(time, args)), but I'm sticking with this for a start.

Now, to implement my job interface. We're starting out with the database as our store, so let's set up our jobs table. I haven't fully thought out the implementation yet, but at the least our jobs will need to store the job arguments and the execution details.

# lib/db_migrate.rb

DB.drop_table? :jobs
DB.create_table :jobs do
  String :id, primary_key: true, size: 12
  String :name, null: false
  JSON :args, null: false
  String :queue, default: "default"
  DateTime :created_at, default: Sequel::CURRENT_TIMESTAMP, index: true
  DateTime :next_execution_at, null: true, index: true # for scheduled jobs and retries
  DateTime :last_executed_at, null: true
  Integer :attempts, default: 0
  String :state, default: "waiting", index: true
  String :error_details, null: true
  String :reserved_by, null: true
end
Enter fullscreen mode Exit fullscreen mode

Since we've defined a state column, let's talk a bit about the job lifecycle. I've decided to go with these:

  • waiting: The job is waiting to be picked up.
  • executing: The job has been picked up by a worker.
  • succeeded
  • failed: The job failed, but it's going to be retried.
  • dead: The job has failed and exhausted the maximum retry count. It won't be retried anymore.

The reserved_by field is used for locks. It allows us to improve concurrency by having multiple queue workers without them clashing with each other. When a worker picks up a job, it sets that field so that other workers don't try to pick up the same job. A worker will only pick up a job that isn't reserved.

Run this with ruby lib/db_migrate.rb, and we're good.

Next, we'll provide a Queueable class that has the dispatch method, which DoSomeStuff extends.

# gator/queueable.rb

module Gator
  class Queueable
    def self.dispatch(*args, **options)
      job = Gator::Models::Job.new(name: self.name, args: args)
      job.save
      Gator::Logger.new.info "Enqueued job id=#{job.id} args=#{job.args}"
    end

    attr_reader :logger

    def initialize
      super
      @logger = Gator::Logger.new
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
# jobs/do_some_stuff.rb

class Jobs::DoSomeStuff < Gator::Queueable
  def handle(arg1, arg2 = nil)
    logger.info "HIIII #{arg1} and #{arg2}"
  end
end
Enter fullscreen mode Exit fullscreen mode

Zooming in on some details:

  • Gator::Queueable is the class end-users will extend. Right now, it only provides two things: the static dispatch method, and a logger for when your job is being executed.
  • Gator::Models::Job is a class that we use to interact with jobs in the database. Its contents:
module Gator
  module Models
    class Job < Sequel::Model
      def before_create
        self.id = "job_" + SecureRandom.hex(12)
        self.args = self[:args].to_json
      end

      def args = JSON.parse(self[:args])
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
  • We store the class name as the name of the job, so the job executor can create an instance of the job class. Unfortunately, this means that if we rename the class after enqueuing a job, but before it is executed, our executor might crash, since the job class no longer exists. (This would also happen if we used the serialize-store-deserialize approach—there'd be no class to deserialize to.)

This is a known limitation of such job systems. It can be fixed by giving every job a name that's independent of the class name, and looking that up instead. Example:

jobs = {
  'do_stuff' => DoSomeStuff,
}

# We can safely rename the class
jobs = {
  'do_stuff' => DoStuff,
}
Enter fullscreen mode Exit fullscreen mode

However, this pushes the problem to another layer. What happens if you want to change the custom name? Similar problem. There isn't a full solution for this, but it isn't a very frequent problem, and you can usually find several workarounds.

And now updating our route, and visiting it:

get "/queue/:count?" do
  count = Integer(params[:count] || 1)
  count.times do |i|
    DoSomeStuff.dispatch("Nellie #{i}", "Buster #{i}")
  end
  "Queued #{count} jobs"
end
Enter fullscreen mode Exit fullscreen mode

The logs:

INFO [2022-10-27 12:31:37] Enqueued job id=job_0f25c864a7a16bb7a4c8ade6 args=["Nellie 0", "Buster 0"]
Enter fullscreen mode Exit fullscreen mode

Okay, great! We can enqueue jobs now. That's the easy part.

The queue worker

Our queue worker will poll the database for new jobs. If there are any, it will try to reserve one.

loop do
  if (job = next_job) 
    execute_job(job)
  else
    sleep(interval)
  end
end
Enter fullscreen mode Exit fullscreen mode

Let's flesh this out into a Worker class:

module Gator
  class Worker
    attr_reader :polling_interval, :logger, :worker_id

    def initialize(**opts)
      super

      @polling_interval = 5 # Check for new jobs every 5s
      @logger = Gator::Logger.new
      @worker_id = "wrk_" + SecureRandom.hex(8)
    end

    def run
      logger.info "Worker #{worker_id} ready"

      loop do
        if (job = next_job)
          execute_job job
          cleanup job
        else
          sleep polling_interval
        end
      end
    end

    def next_job
      job = check_for_jobs
      return nil unless job

      reserve_job(job) || nil
    end

    def check_for_jobs
      query = Models::Job.where(state: "waiting", reserved_by: nil)
      query = query.where { (next_execution_at =~ nil) | (next_execution_at <= Time.now) }
      query.first
    end

    def reserve_job(job)
      # Important: we only reserve the job if it hasn't already been reserved by another worker
      updated_count = Models::Job.where(id: job.id, reserved_by: nil).update(reserved_by: worker_id)
      updated_count == 1 ? job : false
    end

    def execute_job(job)
      Object.const_get(job.name).new.handle(job.args)
      logger.info "Processed job id=#{job.id} result=succeeded args=#{job.args}"
    end

    def cleanup(job)
      job.reserved_by = nil
      job.attempts += 1
      job.last_executed_at = Time.now
      job.state = "succeeded"
      job.save
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Finally, a CLI script that instantiates and starts a Worker:

# gator/bin/work.rb

require 'optparse'

options = {}
OptionParser.new do |parser|
  parser.on("-r", "--require FILE", "File to load at startup. Use this to boot your app.")
end.parse!(into: options)

require options.delete(:require) if options[:require]

require_relative '../worker'

w = Gator::Worker.new
w.run
Enter fullscreen mode Exit fullscreen mode

Note the --require option. I've added this so we can load the user's app before starting the worker, otherwise we'll get errors for undefined classes.

Now we start the worker (boot.rb is the file that loads my app):

ruby ./gator/bin/work.rb -r ./app/boot.rb
Enter fullscreen mode Exit fullscreen mode

And it immediately executes our waiting job:

INFO [2022-10-27 13:31:41] Worker wrk_1c423b74b85c72d6 ready
INFO [2022-10-27 13:31:42] HIIII Nellie 0 and Buster 0
INFO [2022-10-27 13:31:42] Processed job id=job_0f25c864a7a16bb7a4c8ade6 result=succeeded args=["Nellie 0", "Buster 0"]
Enter fullscreen mode Exit fullscreen mode

We can enqueue more jobs, and process them almost instantly:

One limitation of thr reserved_by system is that, if the worker is somehow killed while processing a job, it will never be picked up again. To fix this, we could add a reserved_at column that we use to decide if a job is still locked. But I won't bother with that, since we're switching to Redis later, which makes time-based locks easier.

Concurrency

Let's test our concurrency setup. What happens when we have multiple queue workers? I'll try enqueueing three jobs with three workers listening:

Well, it's a bit disappointing. All three jobs are processed by the same worker.

There are a few reasons for this:

  • The job doesn't don't do any real work, just prints a log. That takes almost no time. If you look at the timestamps, you'll see that all three jobs are executed within a second.
  • Our worker loop algorithm only sleeps if there weren't any jobs. As long as there's a job to process, the worker will keep processing.

These two factors combined mean that the worker that picks up the first job will finish it, and pick up the next, and so on, before the other workers even get to check for jobs again (after 5s). The impact of more workers will only be seen when we have a lot of jobs, or our jobs actually do some work that takes time.

Let's see what happens if I add a little "work" to the job:

class Jobs::DoSomeStuff < Gator::Job
  def handle(arg1, arg2 = nil)
+   sleep 1.5
    logger.info "HIIII #{arg1} and #{arg2}"
  end
end
Enter fullscreen mode Exit fullscreen mode

Ah, that's better. One worker still has nothing to do, but the other two share the three jobs. And none is processed twice.

Handling failures

But what if a job fails? Right now, it would crash our worker. So one more thing for today: let's add some basic error handling.

For now, all we'll do is record it when an error happens. We just need to adjust a few methods in our worker:

module Gator
  class Worker
    # ...
    def run
      logger.info "Worker #{worker_id} ready"

      loop do
        if (job = next_job)
-         execute_job(job)
-         cleanup(job)
+         error = execute_job(job)
+         cleanup(job, error)
        else
          sleep polling_interval
        end
      end
    end

    def execute_job(job)
      Object.const_get(job.name).new.handle(*job.args)
      logger.info "Processed job id=#{job.id} result=succeeded args=#{job.args}"
+     nil
+   rescue => e
+     logger.info "Processed job id=#{job.id} result=failed args=#{job.args}"
+     e
    end

-   def cleanup(job)
+   def cleanup(job, error = nil)
      job.reserved_by = nil
      job.attempts += 1
      job.last_executed_at = Time.now
-     job.state = "succeeded"
+     job.state = error ? "failed" : "succeeded"
+     job.error_details = error if error
      job.save
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

And I'll edit the job so it randomly fails:

class Jobs::DoSomeStuff < Gator::Queueable
  def handle(arg1, arg2 = nil)
    sleep 1.5
+   raise "Oh no, a problem" if rand > 0.8
    logger.info "HIIII #{arg1} and #{arg2}"
  end
end
Enter fullscreen mode Exit fullscreen mode

And now, when I queue a couple of jobs, some fail:

And we can see them marked as failed in the database, complete with the error details:

Okay, that's a good start. You can see the full code at this point on this commit.


Check out the next post in the series.

Top comments (0)