In the fast-paced, data-driven world of computing, reliable message brokers are critical for communication between services. One of the most widely adopted protocols in this domain is the Advanced Message Queuing Protocol (AMQP). As an open standard, AMQP has seen large-scale adoption across industries for its reliability, flexibility, and ability to handle both transactional and high-throughput workloads.
AMQP serves as the backbone for several popular messaging systems, most notably RabbitMQ, which has become a go-to solution for many enterprise clients. But what makes RabbitMQ shine in comparison to alternatives like Kafka, and why is it favored by a wide range of businesses, from startups to established enterprises?
The AMQP protocol is designed to ensure safe, guaranteed delivery of messages with features like message acknowledgments, routing, and delivery confirmations. Enterprises with complex architectures rely on these guarantees to keep their systems resilient. AMQP allows companies to achieve:
Transactional messaging
: Ideal for financial systems or any environment where guaranteed message delivery is crucial.
Flexible message routing
: Through its exchange types (direct, topic, fanout, headers), AMQP makes it easier to handle sophisticated routing logic.
Protocol independence
: Since it's an open standard, AMQP works across multiple languages, platforms, and frameworks, making it adaptable to various enterprise ecosystems.
This versatility has led to AMQP being the protocol of choice for major players in industries such as finance, retail, and telecommunications
TCP based connections can be resource-intensive, and in high-concurrency environments, it is crucial to manage connections efficiently. Using connection pooling allows developers to reuse connections instead of creating new ones for every request. This reduces:
Latency
: Establishing a new connection for every task introduces delays. Connection pools maintain a set of reusable connections that can be quickly allocated to new requests.
Resource Utilization
: Opening too many connections can exhaust memory and CPU resources, especially in large-scale deployments. Pooling ensures that the system remains performant and avoids unnecessary overhead.
By optimizing connection management, pooling helps maintain efficiency even in high-load environments, making it better suited for applications.
Elixir, a modern programming language that runs on the BEAM VM, provides strong support for working with AMQP. Elixir brings:
Concurrency
: Leveraging the BEAM’s concurrency model, Elixir developers can easily build distributed systems that communicate over AMQP, without worrying about thread management.
Robust Libraries
: Libraries like AMQP
and Broadway
in Elixir allow developers to create pipelines for processing AMQP messages with ease. These libraries offer a higher level of abstraction, allowing developers to focus more on business logic rather than low-level messaging code.
Fault-Tolerant Systems
: Since Elixir inherits the fault-tolerance properties of BEAM, developers can build resilient, highly available systems that can recover from failure seamlessly when handling message queues.
For developers building messaging systems with AMQP, Elixir provides a natural and powerful toolset for building high-performance, fault-tolerant applications that scale efficiently.
Now on to the interesting part, how to implement connection pooling in elixir for AMQP
using Poolboy
for publishing messages and setting up Broadway
for consumption
add amqp
, poolboy
, broadway
and broadway_rabbitmq
to your dependencies.
GenServer
to maintain connections and channel and create queues upon initialization
reconnection logic used from Conduit AMQP
defmodule Maverick.Amqp.Broker do
@moduledoc """
Maverick.Amqp.Broker
"""
require Logger
use GenServer
@exchange "topgun"
def host(),
do: Application.fetch_env!(:maverick, :amqp_host)
def vhost(),
do: Application.fetch_env!(:maverick, :amqp_vhost)
def port(),
do: Application.fetch_env!(:maverick, :amqp_port)
def username(),
do: Application.fetch_env!(:maverick, :amqp_username)
def password(),
do: Application.fetch_env!(:maverick, :amqp_password)
def exchange(), do: @exchange
def start_link(_config) do
GenServer.start_link(__MODULE__, [])
end
@impl true
def init(_config) do
connection()
end
@impl true
def handle_call(:get_channel, _from, state) do
{:reply, state.channel, state}
end
@impl true
def handle_call({:create_queues, topics}, _from, state) do
queues(state.channel, topics, @exchange)
end
def handle_info(:reconnect, _from, state) do
case connection() do
{:ok, %{connection: conn, channel: chan}} ->
{:noreply, %{connection: conn, channel: chan}}
{:error, _} ->
# Schedule a retry with a delay
Process.send_after(self(), :reconnect, 5000)
{:noreply, state}
end
end
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
Process.send_after(self(), :reconnect, 1000)
{:noreply, state}
end
@impl true
def terminate(reason, %{connection: connection}) do
Logger.info("AMQP Broker termintated: #{reason}")
AMQP.Connection.close(connection)
end
def channel(worker) do
GenServer.call(worker, :get_channel)
end
def create_queues(queues) do
GenServer.call(__MODULE__, {:create_queues, queues})
end
defp connection() do
case connect() do
{:ok, conn} ->
Process.monitor(conn.pid)
{:ok, chan} = create_channel(conn)
:ok = exchange(chan)
:ok = queues(chan, Maverick.Queue.Gateway.topics())
# Keep channel open for the GenServer lifecycle
{:ok, %{connection: conn, channel: chan}}
{:error, reason} -> {:error, reason}
end
end
defp connect() do
AMQP.Connection.open(
username: username(),
password: password(),
virtual_host: vhost(),
host: host(),
port: port(),
ssl_options: [
verify: :verify_peer,
customize_hostname_check: [
match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
],
# from CAStore package
cacertfile: CAStore.file_path()
]
)
end
defp create_channel(conn) do
AMQP.Channel.open(conn)
end
defp exchange(chan, exchange \\ @exchange) do
AMQP.Exchange.declare(chan, exchange)
end
defp queues(chan, topics, exchange \\ @exchange) do
Enum.each(topics, fn topic ->
AMQP.Queue.declare(chan, topic, durable: true)
AMQP.Queue.bind(chan, topic, exchange, routing_key: topic)
end)
end
end
AMQP client implementing message publishing
defmodule Maverick.Amqp.Client do
use Retry.Annotation
use Appsignal.Instrumentation.Decorators
alias Maverick.Amqp.Broker, as: AmqpBroker
@timeout 60_000
@decorate transaction_event()
@retry with: exponential_backoff() |> randomize() |> expiry(10_000)
def publish(message, topic) do
:poolboy.transaction(
:amqp_worker,
fn worker ->
AMQP.Basic.publish(AmqpBroker.channel(worker), AmqpBroker.exchange(), topic, message)
end,
@timeout
)
end
end
poolboy config to start a pool of connections
import Config
config :maverick, :amqp_worker,
pool_size: 2,
max_overflow: 1
defp amqp_poolboy_config() do
[
{:name, {:local, :amqp_worker}},
{:worker_module, Maverick.Amqp.Broker},
{:size, Application.get_env(:maverick, :amqp_worker)[:pool_size]},
{:max_overflow, Application.get_env(:maverick, :amqp_worker)[:max_overflow]}
]
end
Add poolboy to the Supervisor
children = [
:poolboy.child_spec(:amqp_worker, amqp_poolboy_config())
]
opts = [strategy: :one_for_one, name: Maverick.Supervisor]
Supervisor.start_link(children, opts)
Broadway
to consume messages
defmodule Maverick.Broadway.Amqp.Whatsapp do
@moduledoc """
Maverick.Broadway.Amqp.Whatsapp
"""
use Broadway
use Appsignal.Instrumentation.Decorators
alias Maverick.Amqp.Broker, as: AmqpBroker
alias Maverick.Whatsapp.Message, as: WhatsappMessage
def producer_concurrency(),
do: Application.fetch_env!(:maverick, :broadway_producer_concurrency)
def processors_concurrency(),
do: Application.fetch_env!(:maverick, :broadway_processor_concurrency)
def group(), do: Application.fetch_env!(:maverick, :broadway_client_prefix)
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayRabbitMQ.Producer,
queue: Application.get_env(:maverick, :whatsapp_topic),
on_success: :ack,
on_failure: :reject_and_requeue_once,
declare: [durable: true],
connection: [
username: AmqpBroker.username(),
password: AmqpBroker.password(),
virtual_host: AmqpBroker.vhost(),
host: AmqpBroker.host(),
port: AmqpBroker.port(),
ssl_options: [
verify: :verify_peer,
customize_hostname_check: [
match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
],
# from CAStore package
cacertfile: CAStore.file_path()
]
],
qos: [
prefetch_count: 50
]},
concurrency: producer_concurrency()
],
processors: [
default: [
concurrency: processors_concurrency()
]
]
)
end
@decorate transaction(:broadway_amqp_whatsapp_message)
def handle_message(_, message, _) do
IO.inspect(message, label: "Got Whatsapp message")
message
end
end
AMQP’s adoption continues to grow as more enterprises recognize the need for reliable, transactional messaging solutions. AMQP, with its foundation, shines in environments where reliability, message routing flexibility, and ease of use are critical. Resilient platform, especially when paired with languages like Elixir, which provides native support for AMQP-based systems provides stability and robustness.
For those looking to build robust, scalable messaging systems, AMQP’s advantages over Kafka in transactional and low-latency messaging make it a strong contender in the world of message brokers. As industries increasingly move toward distributed architectures, AMQP's proven reliability and performance in business critical environments ensure its continued success.
Top comments (0)