DEV Community

NDREAN
NDREAN

Posted on • Updated on

Distribute an ETS database in a cluster

We present a quick write-up to explain how to distribute ETS, the build-in fast in-memory database of the Erlang ecosystem.

Why would you need distributed data? When you interact with a "scaled" web app, you don't know which node you will reach so every node/pod should have the same data. Some data can be kept client-side (e.g. your shopping cart), but some also server-side, such as client credentials or cache.

Some ways to manage fast-read data:

  • make the app stateless by using an external database such as Redis. However, this comes with a noticeable overhead especially when you want to secure Redis and make a cluster.
  • keep the app stateful (i.e. save the data locally) and implement sticky sessions with the load balancer or an ingress-affinity in the case of Kubernetes.
  • keep the app stateful and cluster your nodes. The local database needs to be synchronised, reverse-proxied or not, using Kubernetes or not.

The drawback of using sticky sessions is that you may not be able to spread the load among the nodes because only new users will reach the new nodes.

We will use this last route because the BEAM natively supports clustering (Erlang's virtual machine that runs Elixir/Phoenix). In cluster mode, we have the PG module with distributed PubSub for free. This is the second major point of Erlang's ecosystem. Thanks to this, no single point of failure, and no external dependency.

Why would you choose ETS and not MNESIA which is a build-in distributed solution? The main reason was the setup of MNESIA in a dynamic environment such as Kubernetes. Bad synchronisation and proper startup on node discovery can be difficult.

MNESIA has two modes, memory or disk, and ETS has a disk-copy alter ego, DETS. You can still make a disk copy of the ETS table. If the table name is :users, then the command :ets.tab2file(:users, 'data.txt') will save to disk (! use single quotes !).

For our use case, using node discovery with libcluster and synchronising ETS via PubSub on node startup was easy and reliable. We are able to follow easily 1.000 members.

Some code to illustrate. We create users and save them to ETS. We use the GenServer behaviour. We don't keep state in the GenServer memory - in fact, the state is just nil - but rather use the messaging capabilities between processes offered by the GenServer.

Since the ETS database is configured as a set, they will be no duplicate or data lost.

We have a module "Repo" whose task is to save to ETS operations from the app (e.g. when a user is created), and broadcast this data on the topic "new" via the key :new.

defmodule MyApp.Repo do

  require Logger
  alias :ets, as: Ets

  def all,
    do: Ets.tab2list(:users)

  def save(message),
    do: true = Ets.insert(:users, message)

  def save_and_emit(email, context) do
    user = build_from{email, token)
    true = save(user)
    :ok = Phoenix.PubSub.broadcast_from!(PwdlessGs.PubSub, self(), "new", {:new, user})
    user
  end
end
Enter fullscreen mode Exit fullscreen mode

We have a GenServer "NodeListener" who monitors the nodes (Erlang's EPMD integrated server). His task is to listen to :nodeup events, and broadcast his ETS table to the other nodes on the topic "node_up" via the key :sync.

defmodule MyApp.NodeListener do
  use GenServer
  alias MyApp.Repo

  # delay between the "up" event and the node read
  @sync_init 3_000

  def start_link(_opts),
    do: GenServer.start_link(__MODULE__,[], name: __MODULE__)

  @impl true
  def init([]) do
    :ok = :net_kernel.monitor_nodes(true)
    {:ok, nil}
  end

  @impl true
  def handle_info({:nodeup,_}, _) do
    Process.send_after(self(), {:perform_sync}, @sync_init)
    {:noreply, nil}
  end

  @impl true
  def handle_info({:perform_sync}, _state) do
    :ok = Phoenix.PubSub.broadcast_from!(MyApp.PubSub, self(), "node_up", {:sync, Repo.all()}, node())
    {:noreply, nil}
  end

  def handle_info({:nodedown, _},_), do: {:noreply, nil}
end
Enter fullscreen mode Exit fullscreen mode

We have a GenServer "Syncer" who subscribes to two topics, "node_up" and "new". He has three tasks:

  • broadcast his own data on start-up (you may wish to upload a node with data to the cluster),
  • listen to the :sync handler. The node will then receive n-1 copies (out of n nodes) and merge the data with his own table.
  • listen to the :new handler. It conveys operation changes on each node (e.g. a new user).
defmodule MyApp.Syncer do
  use GenServer
  require Logger
  alias :ets, as: Ets
  alias MyApp.Repo

  def load_data(loading) do
    if loading do
      {:ok, _} = Task.start(fn -> 
        fetch_data() |> Repo.save()
        :ok = Phoenix.PubSub.broadcast_from!(
          MyApp.PubSub,
          self(),
          "node_up",
          {:sync, Repo.all(), node()}
        )
      end)
    end

    :ok
  end

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(opts) do
    :users = Ets.new(:users, [:set, :public, :named_table, keypos: 1])
    Logger.info("ETS table started...")

    :ok = Phoenix.PubSub.subscribe(MyApp.PubSub, "new")
    :ok = Phoenix.PubSub.subscribe(MyApp.PubSub, "node_up")

    :ok = if (opts[:preload]),
      do: load_data(true)

    {:ok, nil}
  end

  @impl true
  def handle_info({:new, user}, _state) do
    Repo.save(user)
    {:noreply, nil}
  end

  @impl true
  def handle_info({:sync, table, node, message}, _state) do
    if node != node(), do: Repo.save(table)

    {:noreply, nil}
  end
end

Enter fullscreen mode Exit fullscreen mode

A word about configuration

It is supervised with (in this order):

{Phoenix.PubSub, name: MyApp.PubSub, adapter: Phoenix.PubSub.PG2},
MyApp.NodeListener,
{MyApp.Syncer, [preload: true]},
Enter fullscreen mode Exit fullscreen mode

The config is:

# config.exs
config :my_app, MyAppWeb.Endpoint,
  [...],
  pubsub_server: MyApp.PubSub,
Enter fullscreen mode Exit fullscreen mode

Top comments (0)