DEV Community

Cover image for How to Use Flume in your Elixir Application
Pulkit Goyal for AppSignal

Posted on • Originally published at blog.appsignal.com

How to Use Flume in your Elixir Application

As your Elixir app grows, you might need advanced control over how and where to perform background tasks or pull them off queues to manage back pressure.

In this post, you will learn how to handle background jobs with Flume, a job processing system that uses GenStage and Redis. It provides durability, back pressure, job scheduling, rate limiting, and batch processing, among other things.

We will expand on each of these features in detail.

But before we go further, let's quickly cover two other major libraries in the same space: Oban and Exq.

Oban and Exq: Flume Alternatives for your Elixir App

Both Oban and Exq serve as excellent alternatives to Flume.

Oban, backed by PostgreSQL or SQLite, also provides a queue-based job processing system.
Exq, on the other hand, is backed by Redis. It provides features similar to Flume, but without built-in rate limiting and batch processing capabilities.

Opt for Oban if you don't want to maintain a Redis instance to manage background jobs (Postgres typically performs adequately unless faced with queues exceeding 100k jobs or more) and do not require rate limiting.

Select Exq when you need a highly efficient queuing backend for processing a substantial volume of jobs, but can forgo advanced features such as rate limiting and batch processing.

If you need all three features (a queuing backend, rate limiting, and batch processing), Flume emerges as the ideal choice.

A small word of warning: If you have experience with Rails, you are likely familiar with Sidekiq, Resque, and Delayed Job for background job processing. But note that Elixir/Erlang already has excellent alternatives built into the language in the form of GenServer, Task, and Supervisor, and an additional background job processing system that adds more infrastructure costs and complexity might not be necessary.

A background job processing system like Flume becomes crucial when you need to manage numerous background tasks, track their progress/status (or at least automatically retry them on failure), or apply common constraints (like queue, priority, or rate limit) to many similar jobs.

Keeping all of that in mind, let's set up Flume for our Elixir app.

Installation and Setting Up

To install Flume in your app, simply add the dependency in your mix.exs file and run mix deps.get:

def deps do
  [
    {:flume, github: "scripbox/flume"}
  ]
end
Enter fullscreen mode Exit fullscreen mode

Next, add Flume to your application’s supervision tree by modifying your application.ex file:

defmodule MyApp.Application do
  use Application

  import Supervisor.Spec

  @impl true
  def start(_type, _args) do
    Supervisor.start_link([
      # ...
      # Start Flume supervisor
      supervisor(Flume, []),
      #...
    ], strategy: :one_for_one, name: MyApp.Supervisor)
  end
end
Enter fullscreen mode Exit fullscreen mode

Finally, configure it inside config/config.exs:

config :flume,
  name: Flume,
  # Redis config
  host: "127.0.0.1",
  port: "6379",
  namespace: "my-app",
  database: 0,
  redis_pool_size: 10
Enter fullscreen mode Exit fullscreen mode

There are some other configuration options as well, but we will get to them later in the post. For now, this is all you need to get things set up and start creating background jobs.
Let's create one now:

defmodule MyApp.OrdersWorker do
  def process(order_id) do
    # process the order
  end
end
Enter fullscreen mode Exit fullscreen mode

To initiate a job, we execute Flume.enqueue/4, specifying a queue name, a module name, a function within that module, and an array of arguments to invoke that function:

Flume.enqueue(:default, MyApp.OrdersWorker, :process, [order_id])
Enter fullscreen mode Exit fullscreen mode

This takes care of serializing the arguments and adds the job to Redis. To actually run the job, we need to define a pipeline that picks off the jobs from Redis. We'll see how to do that in the next section.

Pipelines in Flume for Elixir

Pipelines define when and how the actual job processing happens in Flume. We can define pipelines in config/config.exs when configuring Flume:

config :flume, pipelines: [%{name: "default_pipeline", queue: "default"}]
Enter fullscreen mode Exit fullscreen mode

This directs Flume to commence processing jobs from the queue named default, with a maximum of 500 jobs (by default) running simultaneously. Consequently, Flume will initiate up to 500 concurrent processes/tasks, each executing a single job. The max_demand parameter regulates this behavior. If your jobs are inherently resource-intensive, you can specify a lower number to constrain the quantity of workers:

config :flume, pipelines: [%{name: "default_pipeline", queue: "default", max_demand: 10}]
Enter fullscreen mode Exit fullscreen mode

In addition to the queue, there are more configuration options for advanced pipeline control.

Rate Limiting

We can specify rate limits for a pipeline by configuring it like this:

config :flume, pipelines: [%{
  name: "default_pipeline",
  queue: "default",
  rate_limit_count: 100,
  rate_limit_scale: 60 * 1000
}]
Enter fullscreen mode Exit fullscreen mode

This will ensure that the default pipeline processes at most 100 jobs (rate limit count) per minute (rate limit scale of 60 * 1000 milliseconds). It is also possible to share the same rate limit across multiple pipelines by specifying the optional rate_limit_key when defining the pipeline.

For example, if you have two pipelines that process notifications using the same rate-limited API, you can share the limit across both of them like this:

config :flume, pipelines: [
  %{
    name: "email_pipeline",
    queue: "email",
    rate_limit_count: 100,
    rate_limit_scale: 60 * 1000
    rate_limit_key: "notifications"
  },
  %{
    name: "sms_pipeline",
    queue: "sms",
    rate_limit_count: 100,
    rate_limit_scale: 60 * 1000
    rate_limit_key: "notifications"
  }
]
Enter fullscreen mode Exit fullscreen mode

Now, both of these pipelines will process a total of (at most) 100 jobs per minute.

Don't confuse this with the max_demand parameter, which governs how many "concurrent" jobs can execute simultaneously.
Even with a low max demand, it's entirely possible to rapidly process too many jobs if they tend to be short-lived. Rate limiting operates on a time scale level to ensure that the number of executions does not surpass a specified limit within a given time interval.

Batching

It's also useful to define a batch size with a pipeline:

config :flume, pipelines: [%{
  name: "default_pipeline",
  queue: "default",
  max_demand: 10,
  batch_size: 5,
}]
Enter fullscreen mode Exit fullscreen mode

This will instruct Flume to send up to 5 jobs to the same worker (and start up to 10 workers at the same time).

When using batching, each worker will receive an array of all arguments:

defmodule MyApp.OrdersWorker do
  def process(jobs) do
    # jobs contains up to 5 arrays containing arguments for each individual call:
    # [[order1_id], [order2_id], [order3_id], [order4_id], [order5_id]]
  end
end
Enter fullscreen mode Exit fullscreen mode

The application code determines how to manage these jobs.
For instance, in the case of a mailer job, the process/1 function could utilize a bulk email API to send emails to multiple recipients in a single call, rather than making five separate calls to a standard API.

Runtime Control

It is also possible to pause pipelines at runtime using Flume.pause_all/1 or Flume.pause/2.

To pause all pipelines permanently (across all running nodes), run:

Flume.pause_all(temporary: false, async: true)
Enter fullscreen mode Exit fullscreen mode

If you only want to pause a single pipeline, use Flume.pause with the pipeline's name. It accepts the same options as pause_all:

Flume.pause("default_pipeline", temporary: false, async: true)
Enter fullscreen mode Exit fullscreen mode

It is also possible to pause a pipeline/all pipelines on a single node instead of on all nodes. This can be controlled using the temporary parameter. So, to pause a pipeline on only the current node, run:

Flume.pause("default_pipeline", temporary: true, async: true)
Enter fullscreen mode Exit fullscreen mode

Similar to pause and pause_all, you can run resume or resume_all to restart the pipelines.

Pausing a pipeline can be advantageous in scenarios where you need to temporarily suspend job processing to address issues or perform maintenance tasks. For example, during system upgrades or when debugging critical issues, pausing a pipeline can prevent new jobs from being processed, thereby enabling you to stabilize the system.

Another useful scenario is to manage known service downtimes.
For instance, if your job depends on an external service that is known to be unavailable, you can pause the pipeline until the service resumes normal operation.

Advanced Features in Flume

Flume comes with an exponential back-off for failed jobs (jobs that raise an exception) by default. The default configuration is set to retry a failed job with exponential back-offs starting at 500 milliseconds with a maximum back-off of 10 seconds. A job is retried at most 5 times. This can be controlled using:

config :flume,
  backoff_initial: 30_000,
  backoff_max: 36_00_000,
  max_retries: 10
Enter fullscreen mode Exit fullscreen mode

There are several other configuration options to fine-tune Flume further. Please refer to the Flume guide for more details.

Flume uses :telemetry to emit events/metrics.
By default, the emitted events are:

  1. A [:pipeline_name, :worker, :job] event with the duration (in milliseconds) representing the time taken to execute the job. This is the time that your Worker.perform function takes.
  2. A [:pipeline_name, :worker] event with the duration (in milliseconds) representing the time taken to run the job. This includes the total time taken to decode the event payload and execute the job.
  3. A [:queue_name, :enqueue] event with the serialized job's payload_size (in bytes).
  4. A [:queue_name, :dequeue] event with the count (number of jobs fetched), latency (in milliseconds — time taken to fetch the jobs from Redis), and payload_size (in bytes) after Flume fetches jobs from Redis.

If you are already using AppSignal, you can use something like TelemetryMetricsAppsignal to collect these metrics, report them to AppSignal, and get custom dashboards.

Running Flume On Production

All the usage examples we have seen up to now run Flume on the same server as the web server. While this is good for a development workflow or small production workloads, you might consider moving workers to a separate node for scalability. This can be achieved by configuring Flume pipelines based on environment variables.

To control this, we can create a module that helps parse the environment variable:

defmodule MyApp.FlumeConfigurator do
  @default_pipelines %{
    "default_pipeline" => %{name: "default_pipeline", queue: "default"},
    "notifications_pipeline" => %{
      name: "notifications_pipeline",
      queue: "notifications",
      rate_limit_count: 100,
      rate_limit_scale: 60 * 1000
    },
    "video_transcoding_pipeline" => %{name: "video_transcoding_pipeline", queue: "video_transcoding", max_demand: 5}
  }

  def flume_pipelines() do
    pipelines(System.get_env("FLUME_PIPELINES"))
  end

  defp pipelines(nil), do: []

  defp pipelines(value) when is_binary(value) do
    value
    |> String.split(",", trim: true)
    |> Enum.map(fn name -> Map.fetch!(@default_pipelines, name) end)
  end
end
Enter fullscreen mode Exit fullscreen mode

And then configure Flume pipelines inside runtime.exs/release.exs:

import MyApp.FlumeConfigurator, only: [flume_pipelines: 0]
config :flume, pipelines: flume_pipelines()
Enter fullscreen mode Exit fullscreen mode

That way, if you start your released application without defining FLUME_PIPELINES, Flume will not start any pipelines, but you can still enqueue jobs from your application. Then you can start other workers with FLUME_PIPELINES so that all pipelines kick in:

FLUME_PIPELINES=default_pipeline,notifications_pipeline,video_transcoding_pipeline /path/to/release start
Enter fullscreen mode Exit fullscreen mode

You might want to stop the web server on worker-only nodes: you can control that from the runtime.exs/release.exs file with an environment variable.

Note that this is automatically generated if you use the mix phx.gen.release task to prepare your Phoenix app for release:

if System.get_env("PHX_SERVER") do
  config :pug_n_play_platform, PugNPlayPlatformWeb.Endpoint, server: true
end
Enter fullscreen mode Exit fullscreen mode

Best Practices

When enqueueing background jobs with Flume, all the job arguments are serialized and deserialized, and all the job data needs to make a round trip to the Redis server. This means that it is important to keep the payload of the jobs as low as possible. For example, if you have something that persists in a database, instead of passing the full struct, just pass the ID and retrieve it from the database when performing the job.

Another important thing to keep in mind when running jobs is that they should be idempotent. Jobs can run multiple times in cases of failure, so ensure that a job that runs twice doesn't unnecessarily affect your application or its data (or disable retries).

Wrapping Up

In summary, Flume brings efficiency to background job processing in Elixir applications. Its easy installation, configurable pipelines, and advanced features like rate limiting and telemetry make it a valuable tool for scalable and fault-tolerant systems. Embracing best practices, such as minimizing payload and ensuring idempotency, enhances the effectiveness of your background jobs.

With Flume, you not only leverage Elixir's concurrency and fault tolerance but also gain a powerful ally for managing and optimizing your application's background tasks.

Happy coding!

P.S. If you'd like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post!

Top comments (0)