DEV Community

Cover image for Sophisticated, Simple, and Affordable Background Workers
Hopsoft
Hopsoft

Posted on • Updated on

Sophisticated, Simple, and Affordable Background Workers

This is the 2nd post in a series that starts here.

CodeFund

At CodeFund, we processed 5+ million background jobs per day at our peak. Some of those jobs performed computationally expensive operations. Others performed the heavy database work required to operate the business. We hosted on Heroku with 3 Standard 1x dynos responsible for background work. It cost $75/month and had plenty of capacity to scale without additional costs.

CodeFund

How did we accomplish this?

The first thing to consider is how best to separate the work. Think about the different types of jobs your application performs. Are there any jobs that should be isolated from the primary work queue? At CodeFund we pulled financial settlement and ETL processes out and set up a dedicated dyno to run these jobs on a schedule which allowed the primary worker dynos to focus on real-time operations, things like recording ad impressions and clicks.

Here's our Heroku worker dyno configuration.

CodeFund worker dynos

And here's how we configured Sidekiq queues.

# config/sidekiq.yml
:concurrency: 5
:queues:
  - critical
  - mailers
  - schedule
  - impression
  - click
  - default
  - rollbar
  - action_mailbox_routing
  - active_storage_analysis
  - action_mailbox_incineration
  - low
  - traffic
Enter fullscreen mode Exit fullscreen mode
# config/sidekiq_data.yml
:concurrency: 5
:queues:
  - create_debits_for_campaign_and_date
  - create_debit_for_campaign_and_date
  - recalculate_organizataion_balances
  - ensure_daily_summaries
  - daily_summaries
  - daily_summary
  - ensure_scoped_daily_summaries
Enter fullscreen mode Exit fullscreen mode

Note that we didn't use weights. Instead, we leveraged Sidekiq's default behavior of draining queues in linear order.

Considering the worker domain or category will help you organize queues into groups that can be configured and managed independently. For example, this relatively simple approach ensured that ETL processes never blocked or slowed down real time operations.


Importing Third Party Data

Most companies I've worked with need to import and standardize third-party data. How that data is obtained, how often it changes, and what's required to standardize can vary dramatically.

Third party data providers often impose restrictions like rate-limiting to prevent problems within their own systems. They typically require a means of authentication to enforce such controls. Sometimes upstream data is volatile and requires continuous importing.

I've worked on applications where several of our customers shared credentials for accessing third party data with other customers. Sharing credentials can prove challenging, given data access limits, and is further compounded if customers have different requirements for data freshness and accuracy.

Standardizing and augmenting this data can be expensive. It can also trigger large quantities of adjacent work. I've seen small upstream changes that cascaded into millions of jobs for a single customer, which flooded the queue and blocked other critical work.

How can we solve these problems?

I tend to use similar strategies whenever I encounter requirements like this. I lean on Sidekiq Pro and Enterprise features, use great libraries like AcidicJob, and apply my own Rails-fu when needed.

Let's tackle each problem in turn.

Honor Third Party Constraints

Begin with the expectation that some customers will share credentials for accessing third-party data and that rate limits exist for accessing that data. We'll need to throttle requests for all customers sharing the same third party credentials.

Let's create an ActiveRecord model to manage credentials and include the ability to throttle activity. We'll use Sidekiq Enterprise's bucket-based rate limiting to accomplish this.

# app/models/credential.rb
class Credential < ApplicationRecord
  has_many :customers

  # Throttles the passed block for this credential
  def throttle(count = 60, interval = :minute, options={}, &block)
    options = options.reverse_merge(
      wait_timeout: 5, 
      lock_timeout: 60
    )
    Sidekiq::Limiter
      .bucket(to_gid_param, count, interval, options)
      .within_limit(&block)
  end
end
Enter fullscreen mode Exit fullscreen mode
# app/jobs/fetch_data_job.rb
class FetchDataJob < ActiveJob

  def perform(customer)
    customer.credential.throttle do
      # 1. fetch third-party data
    end
    # 2. queue adjacent work
  end
end
Enter fullscreen mode Exit fullscreen mode
customer = Customer.find(1)
FetchDataJob.perform_later customer
Enter fullscreen mode Exit fullscreen mode

This ensures that we don't DOS third parties, even when our customers share credentials and request data at the same time.

Prevent Customers from Consuming all Resources

Remember that importing data triggers a cascade of adjacent work. We need to ensure that a single customer doesn't flood the queue(s), consume all system resources, and block other important work.

We can manage this with clever use of Sidekiq queue weights. First, let's update our Sidekiq configuration and set up partitions for the default and low queues.

# config/sidekiq.yml
:concurrency: 8
:queues:
  - [critical, 6]
  - [high, 4]
  - [default, 2]
  - [default_0, 2]
  - [default_1, 2]
  - [default_2, 2]
  - [default_3, 2]
  - [low, 1]
  - [low_0, 1]
  - [low_1, 1]
  - [low_2, 1]
  - [low_3, 1]
Enter fullscreen mode Exit fullscreen mode

Note that the partitions for default and low queues share the same weights. This means that jobs in these queues will have an equal chance of being dequeued when multiple customers are queueing millions of jobs in parallel. A single customer won't block others.

So how do we get jobs into a partitioned queue?

Let's create a helper method to obtain a random partitioned queue name.

# config/initializers/sidekiq.rb
def Sidekiq.partitioned_queue_name(name = "default", size: 4)
  "#{name}_#{rand size}"
end
Enter fullscreen mode Exit fullscreen mode

Now let's look at how to best use these partitioned queues. We'll revisit our FetchDataJob and create a few others to handle the adjacent work.

# app/jobs/fetch_data_job.rb
class FetchDataJob < ActiveJob
  def perform(customer)
    source_documents = []

    customer.credential.throttle do
      # fetch third-party data and assign source_documents
    end

    source_documents.each do |doc|
      StandardizeDocumentJob
        .set(
          queue: Sidekiq.partitioned_queue_name("default"),
          wait: (0..60).to_a.sample.seconds
        )
        .perform_later(document, url)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Note that we're queueing up adjacent work into StandardizeDocumentJob.

# app/jobs/standardize_document_job.rb
class StandardizeDocumentJob < ActiveJob
  def perform(source_document)
    document = Document.new
    # standardize the source document's data
    document.save

    images = []
    # scan document for linked images

    images.each do |url|
      ImportImageJob
        .set(
          queue: Sidekiq.partitioned_queue_name("low"),
          wait: (0..60).to_a.sample.seconds
        )
        .perform_later(document, url)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

The work then cascades and fans out to ImportImageJob.

# app/jobs/import_image_job.rb
class ImportImageJob < ActiveJob
  def perform(document, url)
    # save url to cloud provider
    # update document
  end
end
Enter fullscreen mode Exit fullscreen mode

A few things to note:

  • We only throttle the request to fetch third party data
  • After the initial import, the work cascades exponentially
  • Partitioned jobs have equal priority if enqueued at around the same time

That's a lot to digest, but the primary takeaway is to be thoughtful when architecting background workers.

Your solution can be as simple or as elaborate as needed. And we've barely scratched the surface of what's possible here. What if we configured our worker infrastructure to run the low queue on a large cluster of servers. How might that affect throughput? I've partitioned worker pools similar to PostgreSQL's Hash based table partitioning to help prevent individual customers or processes from blocking others.
What about other infrastructure changes?


Next up, we'll learn about configuring Rails connections.

Discussion (0)