DEV Community

Cover image for Limiting Concurrency in Elixir Using Registry
Nyoman Abiwinanda
Nyoman Abiwinanda

Posted on

Limiting Concurrency in Elixir Using Registry

When building applications with high performance in mind, we might be tempted to perform concurrent tasks as much as possible. However, performing tasks concurrently, especially in an uncontrolled manner might put your application's reliability at risk. For example, if there are one million of data that you want to process concurrently and you carelessly spawn a million processes suddenly, it might overwhelm your application resources.

Elixir makes it easy for us to perform tasks concurrently however it is also important for us to control the concurrency in a controllable or sensible manner. This is often referred to as Limiting Concurrency or Controlling Back-Pressure in Elixir.

There are several ways to limit concurrency in Elixir and some libraries such as GenStage, Flow, and Broadway have an API for this however what happens if you have a GenServer process that is spawned dynamically on runtime in which you want to limit the number of running concurrent processes yourself. In this case, one way to achieve this is to use Elixir Registry.

1. What is Registry

We won't go into too much detail about what Registry is since it is better to read through the official documentation for this but in a simple way, you could think of Registry as a key-value storage. It is common in elixir to index a GenServer process pids in the registry and later retrieve them in some other part of the application.

2. Setting Up Registry

Setting up a registry is simple and most of the time we don't even need to write additional code. Just like any other thing in elixir, a registry run in its own process hence to start a registry process, simply go to your OTP app supervision tree (this is inside the Application module) and then add the following code under the supervised children.

children = [
  {Registry, keys: :unique, name: YourOTPApp.Registry},
]
Enter fullscreen mode Exit fullscreen mode

Note that you could name your registry with any name you want, in the code above we use the YourOTPApp.Registry as the process name.

When creating a registry, we could specify whether we want the keys option to be :unique or :duplicate. If we set it to :unique then any key can only be registered once. None of the options will prevent us to use the registry to limit concurrency. In this example, we will see how it is done by using the :unique option key.

3. Register Keys to Registry

Any processes in Elixir can be named. Assume we have a genserver process that acts as a worker. We could pass a :name keyword in the GenServer.start_link/3 function to name the worker.

defmodule YourOTPApp.Worker do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  # ...
end
Enter fullscreen mode Exit fullscreen mode

You could pass an atom, string, and tuple to the :name. You could even pass module names such as __MODULE__ which is common to use in genserver processes. One drawback of using the value mentioned is that hardcoding the name to a single value will only limit the process to a single running instance. It is similar to a singleton in object-oriented programming. For example, once a process is running with the name __MODULE__ then no other process can run with the same name again.

In this case, we don't want to hardcode to a single value, we want to be able to provide the name during runtime when we create the process. To do that we could use a registry to uniquely name the genserver process.

To register the genserver process to a registry, we could use a special :via tuple.

defmodule YourOTPApp.Worker do
  use GenServer

  def start_link(args) do
    id = Keyword.get(args, :id)

    GenServer.start_link(__MODULE__, args, name: via(id, "worker"))
  end

  # ...

  defp via(key, value) do
    {:via, Registry, {YourOTPApp.Registry, key, value}}
  end
end
Enter fullscreen mode Exit fullscreen mode

The {:via, Registry, {YourOTPApp.Registry, key, value}} tuple will register the process to the YourOTPApp.Registry with a combination of key-value pair. In this case, only the key has to be unique while the value can be the same. Hence, we are hardcoding the value to "worker" and you will see later why we do this.

4. Limit the Number of Concurrently Running Processes

Once we register the worker processes to the registry, we would be able to query the worker pid from the registry. Now, we are not interested in the pid of the workers, we are just interested to know how many instances of the worker are running. To query how many workers we have, we could use the Registry,select/2 function to find out how many workers are running. The following code demonstrates the select query.

match_all = {:"$1", :"$2", :"$3"}
guards = [{:==, :"$3", "worker"}]
map_result = [%{pid: :"$2"}]

Registry.select(YourOTPApp.Registry, [{match_all, guards, map_result}])
Enter fullscreen mode Exit fullscreen mode

The select query might look a little bit cryptic so let's take a look a little bit in detail:

  1. match_all represents a {key, pid, value} tuple.
  2. guards is a list of filters to query data from the registry. In this case, we only use one filter which is to select key, pid, and values registered where the value is "worker".
  3. map_result is a list that represents how we want to map the data retrieved from the registry.

In this case, how we map the data retrieved from the registry does not matter. What matters is the number of running worker processes.

Once we could count the number of running workers, we could limit the number of concurrent workers by adding a simple conditional logic. The following code demonstrates limiting the running workers to only 5 maximum.

defmodule YourOTPApp do
  def start_worker(args) do
    if Enum.count(running_workers()) <= 5 do
      # code to start the worker
    else
      {:error, :maximum_workers_reached}
    end
  end

  def running_workers() do
    match_all = {:"$1", :"$2", :"$3"}
    guards = [{:==, :"$3", "import"}]
    map_result = [%{pid: :"$2"}]

    Registry.select(YourOTPApp.Registry, [{match_all, guards, map_result}])
  end
end
Enter fullscreen mode Exit fullscreen mode

Note that registry will only returned running pids. If a worker crashed or stopped then its pid will not be returned.

5. Summary

In this post, you just see how Registry can be used to limit the number of concurrent processes in Elixir. Depending on your application or use cases, the way you register or query a process from the registry might differ but the concept is still the same which is to keep track of running processes using the registry and use that tracking to limit the number of them.

Top comments (0)