This is the 2nd post in a series that starts here.
CodeFund
At CodeFund, we processed 5+ million background jobs per day at our peak. Some of those jobs performed computationally expensive operations. Others performed the heavy database work required to operate the business. We hosted on Heroku with 3 Standard 1x dynos responsible for background work. It cost $75/month and had plenty of capacity to scale without additional costs.
How did we accomplish this?
The first thing to consider is how best to separate the work. Think about the different types of jobs your application performs. Are there any jobs that should be isolated from the primary work queue? At CodeFund we pulled financial settlement and ETL processes out and set up a dedicated dyno to run these jobs on a schedule which allowed the primary worker dynos to focus on real-time operations, things like recording ad impressions and clicks.
Here's our Heroku worker dyno configuration.
And here's how we configured Sidekiq queues.
# config/sidekiq.yml
:concurrency: 5
:queues:
- critical
- mailers
- schedule
- impression
- click
- default
- rollbar
- action_mailbox_routing
- active_storage_analysis
- action_mailbox_incineration
- low
- traffic
# config/sidekiq_data.yml
:concurrency: 5
:queues:
- create_debits_for_campaign_and_date
- create_debit_for_campaign_and_date
- recalculate_organizataion_balances
- ensure_daily_summaries
- daily_summaries
- daily_summary
- ensure_scoped_daily_summaries
Note that we didn't use weights. Instead, we leveraged Sidekiq's default behavior of draining queues in linear order.
Considering the worker domain or category will help you organize queues into groups that can be configured and managed independently. For example, this relatively simple approach ensured that ETL processes never blocked or slowed down real time operations.
Importing Third Party Data
Most companies I've worked with need to import and standardize third-party data. How that data is obtained, how often it changes, and what's required to standardize can vary dramatically.
Third party data providers often impose restrictions like rate-limiting to prevent problems within their own systems. They typically require a means of authentication to enforce such controls. Sometimes upstream data is volatile and requires continuous importing.
I've worked on applications where several of our customers shared credentials for accessing third party data with other customers. Sharing credentials can prove challenging, given data access limits, and is further compounded if customers have different requirements for data freshness and accuracy.
Standardizing and augmenting this data can be expensive. It can also trigger large quantities of adjacent work. I've seen small upstream changes that cascaded into millions of jobs for a single customer, which flooded the queue and blocked other critical work.
How can we solve these problems?
I tend to use similar strategies whenever I encounter requirements like this. I lean on Sidekiq Pro and Enterprise features, use great libraries like AcidicJob, and apply my own Rails-fu when needed.
Let's tackle each problem in turn.
Honor Third Party Constraints
Begin with the expectation that some customers will share credentials for accessing third-party data and that rate limits exist for accessing that data. We'll need to throttle requests for all customers sharing the same third party credentials.
Let's create an ActiveRecord model to manage credentials and include the ability to throttle activity. We'll use Sidekiq Enterprise's bucket-based rate limiting to accomplish this.
# app/models/credential.rb
class Credential < ApplicationRecord
has_many :customers
# Throttles the passed block for this credential
def throttle(count = 60, interval = :minute, options={}, &block)
options = options.reverse_merge(
wait_timeout: 5,
lock_timeout: 60
)
Sidekiq::Limiter
.bucket(to_gid_param, count, interval, options)
.within_limit(&block)
end
end
# app/jobs/fetch_data_job.rb
class FetchDataJob < ActiveJob
def perform(customer)
customer.credential.throttle do
# 1. fetch third-party data
end
# 2. queue adjacent work
end
end
customer = Customer.find(1)
FetchDataJob.perform_later customer
This ensures that we don't DOS third parties, even when our customers share credentials and request data at the same time.
Prevent Customers from Consuming all Resources
Remember that importing data triggers a cascade of adjacent work. We need to ensure that a single customer doesn't flood the queue(s), consume all system resources, and block other important work.
We can manage this with clever use of Sidekiq queue weights. First, let's update our Sidekiq configuration and set up partitions for the default
and low
queues.
# config/sidekiq.yml
:concurrency: 8
:queues:
- [critical, 6]
- [high, 4]
- [default, 2]
- [default_0, 2]
- [default_1, 2]
- [default_2, 2]
- [default_3, 2]
- [low, 1]
- [low_0, 1]
- [low_1, 1]
- [low_2, 1]
- [low_3, 1]
Note that the partitions for default
and low
queues share the same weights. This means that jobs in these queues will have an equal chance of being dequeued when multiple customers are queueing millions of jobs in parallel. A single customer won't block others.
So how do we get jobs into a partitioned queue?
Let's create a helper method to obtain a random partitioned queue name.
# config/initializers/sidekiq.rb
def Sidekiq.partitioned_queue_name(name = "default", size: 4)
"#{name}_#{rand size}"
end
Now let's look at how to best use these partitioned queues. We'll revisit our FetchDataJob
and create a few others to handle the adjacent work.
# app/jobs/fetch_data_job.rb
class FetchDataJob < ActiveJob
def perform(customer)
source_documents = []
customer.credential.throttle do
# fetch third-party data and assign source_documents
end
source_documents.each do |doc|
StandardizeDocumentJob
.set(
queue: Sidekiq.partitioned_queue_name("default"),
wait: (0..60).to_a.sample.seconds
)
.perform_later(document, url)
end
end
end
Note that we're queueing up adjacent work into StandardizeDocumentJob
.
# app/jobs/standardize_document_job.rb
class StandardizeDocumentJob < ActiveJob
def perform(source_document)
document = Document.new
# standardize the source document's data
document.save
images = []
# scan document for linked images
images.each do |url|
ImportImageJob
.set(
queue: Sidekiq.partitioned_queue_name("low"),
wait: (0..60).to_a.sample.seconds
)
.perform_later(document, url)
end
end
end
The work then cascades and fans out to ImportImageJob
.
# app/jobs/import_image_job.rb
class ImportImageJob < ActiveJob
def perform(document, url)
# save url to cloud provider
# update document
end
end
A few things to note:
- We only throttle the request to fetch third party data
- After the initial import, the work cascades exponentially
- Partitioned jobs have equal priority if enqueued at around the same time
That's a lot to digest, but the primary takeaway is to be thoughtful when architecting background workers.
Your solution can be as simple or as elaborate as needed. And we've barely scratched the surface of what's possible here. What if we configured our worker infrastructure to run the low
queue on a large cluster of servers. How might that affect throughput? I've partitioned worker pools similar to PostgreSQL's Hash based table partitioning to help prevent individual customers or processes from blocking others.
What about other infrastructure changes?
Next up, we'll learn about configuring Rails connections.
Top comments (0)