DEV Community

Cover image for Pooling AMQP TLS connections in Elixir for high throughput and low latency
Ahsan Nabi Dar
Ahsan Nabi Dar

Posted on • Edited on

Pooling AMQP TLS connections in Elixir for high throughput and low latency

In the fast-paced, data-driven world of computing, reliable message brokers are critical for communication between services. One of the most widely adopted protocols in this domain is the Advanced Message Queuing Protocol (AMQP). As an open standard, AMQP has seen large-scale adoption across industries for its reliability, flexibility, and ability to handle both transactional and high-throughput workloads.

AMQP serves as the backbone for several popular messaging systems, most notably RabbitMQ, which has become a go-to solution for many enterprise clients. But what makes RabbitMQ shine in comparison to alternatives like Kafka, and why is it favored by a wide range of businesses, from startups to established enterprises?

The AMQP protocol is designed to ensure safe, guaranteed delivery of messages with features like message acknowledgments, routing, and delivery confirmations. Enterprises with complex architectures rely on these guarantees to keep their systems resilient. AMQP allows companies to achieve:

Transactional messaging: Ideal for financial systems or any environment where guaranteed message delivery is crucial.

Flexible message routing: Through its exchange types (direct, topic, fanout, headers), AMQP makes it easier to handle sophisticated routing logic.

Protocol independence: Since it's an open standard, AMQP works across multiple languages, platforms, and frameworks, making it adaptable to various enterprise ecosystems.

This versatility has led to AMQP being the protocol of choice for major players in industries such as finance, retail, and telecommunications

TCP based connections can be resource-intensive, and in high-concurrency environments, it is crucial to manage connections efficiently. Using connection pooling allows developers to reuse connections instead of creating new ones for every request. This reduces:

Latency: Establishing a new connection for every task introduces delays. Connection pools maintain a set of reusable connections that can be quickly allocated to new requests.

Resource Utilization: Opening too many connections can exhaust memory and CPU resources, especially in large-scale deployments. Pooling ensures that the system remains performant and avoids unnecessary overhead.

By optimizing connection management, pooling helps maintain efficiency even in high-load environments, making it better suited for applications.

Elixir, a modern programming language that runs on the BEAM VM, provides strong support for working with AMQP. Elixir brings:

Concurrency: Leveraging the BEAM’s concurrency model, Elixir developers can easily build distributed systems that communicate over AMQP, without worrying about thread management.

Robust Libraries: Libraries like AMQP and Broadway in Elixir allow developers to create pipelines for processing AMQP messages with ease. These libraries offer a higher level of abstraction, allowing developers to focus more on business logic rather than low-level messaging code.

Fault-Tolerant Systems: Since Elixir inherits the fault-tolerance properties of BEAM, developers can build resilient, highly available systems that can recover from failure seamlessly when handling message queues.

For developers building messaging systems with AMQP, Elixir provides a natural and powerful toolset for building high-performance, fault-tolerant applications that scale efficiently.

Now on to the interesting part, how to implement connection pooling in elixir for AMQP using Poolboy for publishing messages and setting up Broadway for consumption

add amqp, poolboy, broadway and broadway_rabbitmq to your dependencies.

GenServer to maintain connections and channel and create queues upon initialization

reconnection logic used from Conduit AMQP

defmodule Maverick.Amqp.Broker do
  @moduledoc """
  Maverick.Amqp.Broker
  """

  require Logger

  use GenServer

  @exchange "topgun"

  def host(),
    do: Application.fetch_env!(:maverick, :amqp_host)

  def vhost(),
    do: Application.fetch_env!(:maverick, :amqp_vhost)

  def port(),
    do: Application.fetch_env!(:maverick, :amqp_port)

  def username(),
    do: Application.fetch_env!(:maverick, :amqp_username)

  def password(),
    do: Application.fetch_env!(:maverick, :amqp_password)

  def exchange(), do: @exchange

  def start_link(_config) do
    GenServer.start_link(__MODULE__, [])
  end

  @impl true
  def init(_config) do
   connection()
  end

  @impl true
  def handle_call(:get_channel, _from, state) do
    {:reply, state.channel, state}
  end

  @impl true
  def handle_call({:create_queues, topics}, _from, state) do
    queues(state.channel, topics, @exchange)
  end

  def handle_info(:reconnect, _from, state) do
  case connection() do
      {:ok, %{connection: conn, channel: chan}} ->
        {:noreply, %{connection: conn, channel: chan}}
      {:error, _} ->
        # Schedule a retry with a delay
        Process.send_after(self(), :reconnect, 5000)
        {:noreply, state}
    end 
  end

  def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
      Process.send_after(self(), :reconnect, 1000)
      {:noreply, state}
  end

  @impl true
  def terminate(reason, %{connection: connection}) do
    Logger.info("AMQP Broker termintated: #{reason}")
    AMQP.Connection.close(connection)
  end

  def channel(worker) do
    GenServer.call(worker, :get_channel)
  end

  def create_queues(queues) do
    GenServer.call(__MODULE__, {:create_queues, queues})
  end

  defp connection() do
    case connect() do
    {:ok, conn}  -> 
      Process.monitor(conn.pid)
      {:ok, chan} = create_channel(conn)
      :ok = exchange(chan)
      :ok = queues(chan, Maverick.Queue.Gateway.topics())
      # Keep channel open for the GenServer lifecycle
      {:ok, %{connection: conn, channel: chan}}
    {:error, reason} -> {:error, reason}
    end

  end

  defp connect() do
    AMQP.Connection.open(
      username: username(),
      password: password(),
      virtual_host: vhost(),
      host: host(),
      port: port(),
      ssl_options: [
        verify: :verify_peer,
        customize_hostname_check: [
          match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
        ],
        # from CAStore package
        cacertfile: CAStore.file_path()
      ]
    )
  end

  defp create_channel(conn) do
    AMQP.Channel.open(conn)
  end

  defp exchange(chan, exchange \\ @exchange) do
    AMQP.Exchange.declare(chan, exchange)
  end

  defp queues(chan, topics, exchange \\ @exchange) do
    Enum.each(topics, fn topic ->
      AMQP.Queue.declare(chan, topic, durable: true)
      AMQP.Queue.bind(chan, topic, exchange, routing_key: topic)
    end)
  end
end
Enter fullscreen mode Exit fullscreen mode

AMQP client implementing message publishing

defmodule Maverick.Amqp.Client do


  use Retry.Annotation
  use Appsignal.Instrumentation.Decorators

  alias Maverick.Amqp.Broker, as: AmqpBroker

  @timeout 60_000

  @decorate transaction_event()
  @retry with: exponential_backoff() |> randomize() |> expiry(10_000)
  def publish(message, topic) do
    :poolboy.transaction(
      :amqp_worker,
      fn worker ->
        AMQP.Basic.publish(AmqpBroker.channel(worker), AmqpBroker.exchange(), topic, message)
      end,
      @timeout
    )
  end
end
Enter fullscreen mode Exit fullscreen mode

poolboy config to start a pool of connections

import Config

config :maverick, :amqp_worker,
  pool_size: 2,
  max_overflow: 1
Enter fullscreen mode Exit fullscreen mode
 defp amqp_poolboy_config() do
    [
      {:name, {:local, :amqp_worker}},
      {:worker_module, Maverick.Amqp.Broker},
      {:size, Application.get_env(:maverick, :amqp_worker)[:pool_size]},
      {:max_overflow, Application.get_env(:maverick, :amqp_worker)[:max_overflow]}
    ]
  end
Enter fullscreen mode Exit fullscreen mode

Add poolboy to the Supervisor

children = [
:poolboy.child_spec(:amqp_worker, amqp_poolboy_config())
]

 opts = [strategy: :one_for_one, name: Maverick.Supervisor]
    Supervisor.start_link(children, opts)
Enter fullscreen mode Exit fullscreen mode

Broadway to consume messages

defmodule Maverick.Broadway.Amqp.Whatsapp do
  @moduledoc """
  Maverick.Broadway.Amqp.Whatsapp
  """
  use Broadway

  use Appsignal.Instrumentation.Decorators

  alias Maverick.Amqp.Broker, as: AmqpBroker
  alias Maverick.Whatsapp.Message, as: WhatsappMessage

  def producer_concurrency(),
    do: Application.fetch_env!(:maverick, :broadway_producer_concurrency)

  def processors_concurrency(),
    do: Application.fetch_env!(:maverick, :broadway_processor_concurrency)

  def group(), do: Application.fetch_env!(:maverick, :broadway_client_prefix)

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayRabbitMQ.Producer,
           queue: Application.get_env(:maverick, :whatsapp_topic),
           on_success: :ack,
           on_failure: :reject_and_requeue_once,
           declare: [durable: true],
           connection: [
             username: AmqpBroker.username(),
             password: AmqpBroker.password(),
             virtual_host: AmqpBroker.vhost(),
             host: AmqpBroker.host(),
             port: AmqpBroker.port(),
             ssl_options: [
               verify: :verify_peer,
               customize_hostname_check: [
                 match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
               ],
               # from CAStore package
               cacertfile: CAStore.file_path()
             ]
           ],
           qos: [
             prefetch_count: 50
           ]},
        concurrency: producer_concurrency()
      ],
      processors: [
        default: [
          concurrency: processors_concurrency()
        ]
      ]
    )
  end

  @decorate transaction(:broadway_amqp_whatsapp_message)
  def handle_message(_, message, _) do
    IO.inspect(message, label: "Got Whatsapp message")
    message
  end
end
Enter fullscreen mode Exit fullscreen mode

AMQP’s adoption continues to grow as more enterprises recognize the need for reliable, transactional messaging solutions. AMQP, with its foundation, shines in environments where reliability, message routing flexibility, and ease of use are critical. Resilient platform, especially when paired with languages like Elixir, which provides native support for AMQP-based systems provides stability and robustness.

For those looking to build robust, scalable messaging systems, AMQP’s advantages over Kafka in transactional and low-latency messaging make it a strong contender in the world of message brokers. As industries increasingly move toward distributed architectures, AMQP's proven reliability and performance in business critical environments ensure its continued success.

Top comments (0)