As your Elixir app grows, you might need advanced control over how and where to perform background tasks or pull them off queues to manage back pressure.
In this post, you will learn how to handle background jobs with Flume, a job processing system that uses GenStage and Redis. It provides durability, back pressure, job scheduling, rate limiting, and batch processing, among other things.
We will expand on each of these features in detail.
But before we go further, let's quickly cover two other major libraries in the same space: Oban and Exq.
Oban and Exq: Flume Alternatives for your Elixir App
Both Oban and Exq serve as excellent alternatives to Flume.
Oban, backed by PostgreSQL or SQLite, also provides a queue-based job processing system.
Exq, on the other hand, is backed by Redis. It provides features similar to Flume, but without built-in rate limiting and batch processing capabilities.
Opt for Oban if you don't want to maintain a Redis instance to manage background jobs (Postgres typically performs adequately unless faced with queues exceeding 100k jobs or more) and do not require rate limiting.
Select Exq when you need a highly efficient queuing backend for processing a substantial volume of jobs, but can forgo advanced features such as rate limiting and batch processing.
If you need all three features (a queuing backend, rate limiting, and batch processing), Flume emerges as the ideal choice.
A small word of warning: If you have experience with Rails, you are likely familiar with Sidekiq, Resque, and Delayed Job for background job processing. But note that Elixir/Erlang already has excellent alternatives built into the language in the form of GenServer, Task, and Supervisor, and an additional background job processing system that adds more infrastructure costs and complexity might not be necessary.
A background job processing system like Flume becomes crucial when you need to manage numerous background tasks, track their progress/status (or at least automatically retry them on failure), or apply common constraints (like queue, priority, or rate limit) to many similar jobs.
Keeping all of that in mind, let's set up Flume for our Elixir app.
Installation and Setting Up
To install Flume in your app, simply add the dependency in your mix.exs
file and run mix deps.get
:
def deps do
[
{:flume, github: "scripbox/flume"}
]
end
Next, add Flume to your application’s supervision tree by modifying your application.ex
file:
defmodule MyApp.Application do
use Application
import Supervisor.Spec
@impl true
def start(_type, _args) do
Supervisor.start_link([
# ...
# Start Flume supervisor
supervisor(Flume, []),
#...
], strategy: :one_for_one, name: MyApp.Supervisor)
end
end
Finally, configure it inside config/config.exs
:
config :flume,
name: Flume,
# Redis config
host: "127.0.0.1",
port: "6379",
namespace: "my-app",
database: 0,
redis_pool_size: 10
There are some other configuration options as well, but we will get to them later in the post. For now, this is all you need to get things set up and start creating background jobs.
Let's create one now:
defmodule MyApp.OrdersWorker do
def process(order_id) do
# process the order
end
end
To initiate a job, we execute Flume.enqueue/4
, specifying a queue name, a module name, a function within that module, and an array of arguments to invoke that function:
Flume.enqueue(:default, MyApp.OrdersWorker, :process, [order_id])
This takes care of serializing the arguments and adds the job to Redis. To actually run the job, we need to define a pipeline that picks off the jobs from Redis. We'll see how to do that in the next section.
Pipelines in Flume for Elixir
Pipelines define when and how the actual job processing happens in Flume. We can define pipelines in config/config.exs
when configuring Flume:
config :flume, pipelines: [%{name: "default_pipeline", queue: "default"}]
This directs Flume to commence processing jobs from the queue named default
, with a maximum of 500
jobs (by default) running simultaneously. Consequently, Flume will initiate up to 500 concurrent processes/tasks, each executing a single job. The max_demand
parameter regulates this behavior. If your jobs are inherently resource-intensive, you can specify a lower number to constrain the quantity of workers:
config :flume, pipelines: [%{name: "default_pipeline", queue: "default", max_demand: 10}]
In addition to the queue
, there are more configuration options for advanced pipeline control.
Rate Limiting
We can specify rate limits for a pipeline by configuring it like this:
config :flume, pipelines: [%{
name: "default_pipeline",
queue: "default",
rate_limit_count: 100,
rate_limit_scale: 60 * 1000
}]
This will ensure that the default pipeline processes at most 100 jobs (rate limit count) per minute (rate limit scale of 60 * 1000
milliseconds). It is also possible to share the same rate limit across multiple pipelines by specifying the optional rate_limit_key
when defining the pipeline.
For example, if you have two pipelines that process notifications using the same rate-limited API, you can share the limit across both of them like this:
config :flume, pipelines: [
%{
name: "email_pipeline",
queue: "email",
rate_limit_count: 100,
rate_limit_scale: 60 * 1000
rate_limit_key: "notifications"
},
%{
name: "sms_pipeline",
queue: "sms",
rate_limit_count: 100,
rate_limit_scale: 60 * 1000
rate_limit_key: "notifications"
}
]
Now, both of these pipelines will process a total of (at most) 100 jobs per minute.
Don't confuse this with the max_demand
parameter, which governs how many "concurrent" jobs can execute simultaneously.
Even with a low max demand, it's entirely possible to rapidly process too many jobs if they tend to be short-lived. Rate limiting operates on a time scale level to ensure that the number of executions does not surpass a specified limit within a given time interval.
Batching
It's also useful to define a batch size with a pipeline:
config :flume, pipelines: [%{
name: "default_pipeline",
queue: "default",
max_demand: 10,
batch_size: 5,
}]
This will instruct Flume to send up to 5 jobs to the same worker (and start up to 10 workers at the same time).
When using batching, each worker will receive an array of all arguments:
defmodule MyApp.OrdersWorker do
def process(jobs) do
# jobs contains up to 5 arrays containing arguments for each individual call:
# [[order1_id], [order2_id], [order3_id], [order4_id], [order5_id]]
end
end
The application code determines how to manage these jobs.
For instance, in the case of a mailer job, the process/1
function could utilize a bulk email API to send emails to multiple recipients in a single call, rather than making five separate calls to a standard API.
Runtime Control
It is also possible to pause pipelines at runtime using Flume.pause_all/1
or Flume.pause/2
.
To pause all pipelines permanently (across all running nodes), run:
Flume.pause_all(temporary: false, async: true)
If you only want to pause a single pipeline, use Flume.pause
with the pipeline's name. It accepts the same options as pause_all
:
Flume.pause("default_pipeline", temporary: false, async: true)
It is also possible to pause a pipeline/all pipelines on a single node instead of on all nodes. This can be controlled using the temporary parameter. So, to pause a pipeline on only the current node, run:
Flume.pause("default_pipeline", temporary: true, async: true)
Similar to pause
and pause_all
, you can run resume
or resume_all
to restart the pipelines.
Pausing a pipeline can be advantageous in scenarios where you need to temporarily suspend job processing to address issues or perform maintenance tasks. For example, during system upgrades or when debugging critical issues, pausing a pipeline can prevent new jobs from being processed, thereby enabling you to stabilize the system.
Another useful scenario is to manage known service downtimes.
For instance, if your job depends on an external service that is known to be unavailable, you can pause the pipeline until the service resumes normal operation.
Advanced Features in Flume
Flume comes with an exponential back-off for failed jobs (jobs that raise an exception) by default. The default configuration is set to retry a failed job with exponential back-offs starting at 500 milliseconds with a maximum back-off of 10 seconds. A job is retried at most 5 times. This can be controlled using:
config :flume,
backoff_initial: 30_000,
backoff_max: 36_00_000,
max_retries: 10
There are several other configuration options to fine-tune Flume further. Please refer to the Flume guide for more details.
Flume uses :telemetry
to emit events/metrics.
By default, the emitted events are:
- A
[:pipeline_name, :worker, :job]
event with theduration
(in milliseconds) representing the time taken to execute the job. This is the time that yourWorker.perform
function takes. - A
[:pipeline_name, :worker]
event with theduration
(in milliseconds) representing the time taken to run the job. This includes the total time taken to decode the event payload and execute the job. - A
[:queue_name, :enqueue]
event with the serialized job'spayload_size
(in bytes). - A
[:queue_name, :dequeue]
event with thecount
(number of jobs fetched),latency
(in milliseconds — time taken to fetch the jobs from Redis), andpayload_size
(in bytes) after Flume fetches jobs from Redis.
If you are already using AppSignal, you can use something like TelemetryMetricsAppsignal to collect these metrics, report them to AppSignal, and get custom dashboards.
Running Flume On Production
All the usage examples we have seen up to now run Flume on the same server as the web server. While this is good for a development workflow or small production workloads, you might consider moving workers to a separate node for scalability. This can be achieved by configuring Flume pipelines based on environment variables.
To control this, we can create a module that helps parse the environment variable:
defmodule MyApp.FlumeConfigurator do
@default_pipelines %{
"default_pipeline" => %{name: "default_pipeline", queue: "default"},
"notifications_pipeline" => %{
name: "notifications_pipeline",
queue: "notifications",
rate_limit_count: 100,
rate_limit_scale: 60 * 1000
},
"video_transcoding_pipeline" => %{name: "video_transcoding_pipeline", queue: "video_transcoding", max_demand: 5}
}
def flume_pipelines() do
pipelines(System.get_env("FLUME_PIPELINES"))
end
defp pipelines(nil), do: []
defp pipelines(value) when is_binary(value) do
value
|> String.split(",", trim: true)
|> Enum.map(fn name -> Map.fetch!(@default_pipelines, name) end)
end
end
And then configure Flume pipelines inside runtime.exs
/release.exs
:
import MyApp.FlumeConfigurator, only: [flume_pipelines: 0]
config :flume, pipelines: flume_pipelines()
That way, if you start your released application without defining FLUME_PIPELINES
, Flume will not start any pipelines, but you can still enqueue jobs from your application. Then you can start other workers with FLUME_PIPELINES
so that all pipelines kick in:
FLUME_PIPELINES=default_pipeline,notifications_pipeline,video_transcoding_pipeline /path/to/release start
You might want to stop the web server on worker-only nodes: you can control that from the runtime.exs
/release.exs
file with an environment variable.
Note that this is automatically generated if you use the mix phx.gen.release
task to prepare your Phoenix app for release:
if System.get_env("PHX_SERVER") do
config :pug_n_play_platform, PugNPlayPlatformWeb.Endpoint, server: true
end
Best Practices
When enqueueing background jobs with Flume, all the job arguments are serialized and deserialized, and all the job data needs to make a round trip to the Redis server. This means that it is important to keep the payload of the jobs as low as possible. For example, if you have something that persists in a database, instead of passing the full struct, just pass the ID and retrieve it from the database when performing the job.
Another important thing to keep in mind when running jobs is that they should be idempotent. Jobs can run multiple times in cases of failure, so ensure that a job that runs twice doesn't unnecessarily affect your application or its data (or disable retries).
Wrapping Up
In summary, Flume brings efficiency to background job processing in Elixir applications. Its easy installation, configurable pipelines, and advanced features like rate limiting and telemetry make it a valuable tool for scalable and fault-tolerant systems. Embracing best practices, such as minimizing payload and ensuring idempotency, enhances the effectiveness of your background jobs.
With Flume, you not only leverage Elixir's concurrency and fault tolerance but also gain a powerful ally for managing and optimizing your application's background tasks.
Happy coding!
P.S. If you'd like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post!
Top comments (0)