In this post, we will explore how event-driven architecture can make your app more responsive for users and decouple your modules for a better developer experience.
We will also look at several methods of implementing event-driven architecture with Elixir.
Elixir is particularly good for this because of the advanced and concise message-passing APIs that it offers and BEAM's outstanding support for concurrency.
But first: what is event-driven architecture, exactly?
Event-Driven Architecture: An Introduction
Event-driven architecture is an architecture where events control the behavior and flow of your application.
The main components of the architecture are event producers, event bus, and event consumers.
An event could be anything that represents a change of state in the system.
For example, in an e-commerce application, the purchase of a product by the user could produce a sold
event which the consumer can then process to update inventory.
An event-based architecture allows applications to act on events as they occur.
Different parts of an application work and develop relatively independently in well-crafted event-based design. Organizations can assign separate teams to focused parts of the application and streamline the workflow. This also creates a clear boundary between different parts of an application, assisting in future scalability exercises.
Event-driven architecture has mainly gained popularity with microservice-based products but can also be used for a monolith.
Things are always clearer with an example, so let's look at one.
Building Blocks of Event-Driven Architecture
Let's discuss each building block in detail, using an e-commerce application as an example.
Imagine that a user makes a new purchase on a website.
The new order
is an event generated by the part that controls the ordering: the event producer.
The event can be pushed onto an event bus.
The event bus could be anything, such as:
- A table in the database.
- An in-memory event queue inside the app.
- An external tool like RabbitMQ or Apache Kafka.
The event consumers interested in this type of event can subscribe to the event bus. The event is delivered to them, and they do some processing on top of it.
For example, an inventory management system would subscribe to the new order
event and update the inventory of the product.
Another system could also pick the same event in the application โ for example, a fulfillment service might process that event and create a delivery route for the product.
Benefits of Event-Driven Architecture
There are several advantages of using an event-driven architecture instead of one that processes everything sequentially.
An event-driven architecture allows us to build several independent parts of an application to work off the same event and do different focused tasks.
This can be advantageous for a couple of reasons:
- One team needs to focus on only one part.
- The application code for small parts can be simple.
Another advantage of the design is that it allows us to deliver a snappy interface to the user. As an example from the above application, a user needs to make an order.
The application can continue processing all other non-interactive tasks, like updating its inventory and interacting with the delivery application, without the user's attention.
This also makes adding new processing steps in the event pipeline very easy. For example, let's say we need an additional task to be performed from an event. We only need to add a new consumer to handle the event, without touching any other parts of the application.
Finally, it is much easier to scale each individual module if it is decoupled from the others, than to scale a whole application together.
This is even more beneficial when you have a part that takes much more resources than its counterparts in the event processing pipeline.
But event-driven architecture does not come without disadvantages when not properly thought out. If applied to very simple problems, it can lead to complex workflows that are slow and difficult to debug.
Let's explore some simple ways event-driven architecture can be implemented with Elixir, without the need to write complex pieces of code.
Synchronous Event-Driven Architecture in Elixir
The simplest (and most inefficient) way to run the above flow would be to do everything synchronously in the user request.
So, if you have an Orders
module that processes a user's order request, the synchronous implementation could look like this:
defmodule Orders do
def create_order(attrs) do
{:ok, order} = save_order(attrs)
{:ok, _inventory} = update_inventory(order)
{:ok, _delivery} = create_delivery(order)
{:ok, order}
end
end
We can improve this to more easily scale for new event consumers:
defmodule Orders do
@event_consumers [
{Inventory, :handle_event},
{Delivery, :handle_event},
]
def create_order(attrs) do
{:ok, order} = save_order(attrs)
event = %Orders.Event{type: :new_order, payload: order}
@event_consumers
|> Enum.each(fn {module, func} ->
apply(module, func, [event])
end)
{:ok, order}
end
end
With this implementation, all we need to do to add new consumers is to add the specification in the @event_consumers
array, and those consumers can work independently.
While the synchronous approach works well for a small number of consumers, it has a disadvantage. Creating an order might take a long time because you will need to wait for the inventory to update and the delivery to be created.
These are all internal tasks that can be performed without user interaction and moved from the synchronous chain.
Let's see how we can further streamline event-driven architecture using GenServer
.
Using GenServer for Event-Driven Architecture in Elixir
For this implementation, we will run a separate process for each consumer.
These will subscribe to an event from a producer and run their tasks concurrently.
For the event bus, we can use Phoenix.PubSub
.
Note that it is possible for apps that don't use Phoenix to directly use Registry as a PubSub.
First, let's look at the producer.
defmodule Orders do
def create_order(attrs) do
{:ok, order} = save_order(attrs)
event = %Orders.Event{type: :new_order, payload: order}
Phoenix.PubSub.broadcast(:my_app, "new_order", event)
{:ok, order}
end
end
On a new order, we create the event struct and use Phoenix.PubSub.broadcast/3
to broadcast that event on the bus. As you can see, it is much simpler than the previous implementation where the Orders
module processed tasks from the other module serially.
The consumers can then subscribe to the new_order
topic and implement the handle_info/2
to be notified every time a new event is published by the producer.
defmodule Inventory do
use GenServer
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
def init(_opts) do
Phoenix.PubSub.subscribe(:my_app, "new_order")
end
def handle_info(%Orders.Event{type: :new_order, payload: order}, state) do
state = consume(state, order.product)
{:noreply, state}
end
end
The Delivery
module will be very similar to the above, so I am skipping it here.
As you can see, this is much better than the previous implementations. Inventory
and Delivery
modules can independently subscribe to the new_order
topic. The Orders
module broadcasts to this topic on new orders and events are delivered to the subscribed processes.
You could even distribute this between multiple nodes and Phoenix.PubSub
(with a PG, Redis, or other adapter), spreading the events to all nodes.
Great, right? Not really. There are several issues with this approach:
- PubSub provides a real-time broadcast without message queueing, so if one of the subscriber processes is down, it might miss the broadcasts.
- If the subscriber does some heavy work, it might not be able to keep up with the incoming messages, resulting in timeouts and consequently crashing the process tree.
- If the subscriber experiences an error when processing a message, it is considered consumed and won't be retried later.
So this approach is a bad one to follow for our current use case.
However, the approach still has its use cases. It can be used for tasks that aren't critical, or that can be corrected with the next message: for example, if a task computes the suggestions for a user's next purchases based on their last purchase.
While this would also need to be triggered on a new order, it isn't exactly critical (for a traditional e-commerce website) and can re-compute suggestions for a user on their next purchase.
Event-Driven Implementation Using GenStage in Elixir
In the previous section, we saw a great implementation of our event-driven system using GenServer. But it didn't come without its limitations. Let's see how GenStage fares.
GenStage makes a clear distinction between producers
, consumers
, and producer_consumers
, and each process has to pick one when it starts (in its init/1
). In our case, both Inventory
and Deliver
are consumers
, and Orders
is a producer.
This is where things start to get a little complicated.
GenStage
has a concept of demand. Each consumer
can issue a demand of how many events it can handle. The producer
needs to send those events to the consumer.
Let's see a basic producer
in action.
defmodule Orders do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
{:producer, :some_state_which_does_not_currently_mattere}
end
def create_order(pid, attrs) do
GenStage.cast(pid, {:create_order, attrs})
end
def handle_cast({:create_order, attrs}, state) do
{:ok, order} = save_order(attrs)
{:noreply, [%Orders.Event{type: new_order, payload: order}], state}
end
def handle_demand(_demand, state), do: {:noreply, [], state}
end
The meat of our code is in handle_cast
, where we save the order and return a tuple like {:noreply, events, new_state}
.
The new events are stored in an internal GenStage
buffer and dispatched to the consumers as they make new demands (or immediately, if there are consumers with unmet demand).
Let's check out a sample implementation of the consumer
:
defmodule Inventory do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
{:consumer, [], subscribe_to: [Orders]}
end
def handle_events(events, _from, state) do
state = Enum.reduce(events, state, & handle_event(&1, &2))
{:noreply, [], state}
end
def handle_event(%Orders.Event{type: :new_order, payload: order}, state) do
new_state = update_inventory(order)
new_state
end
end
In the consumer, first notice that we have a subscribe_to
inside init/1
. This automatically subscribes Inventory
to any events published by Orders
. Please check the GenStage documentation for additional options available in init.
Here, most of the work happens inside handle_events/3
, which is automatically called by GenStage
as soon as new events become available.
We handle the new_order
event here, updating the inventory and returning a new state.
With this simple implementation, we get several benefits that outpace the GenServer implementation:
- Automatic buffering of events inside GenStage's internal buffer when the producer has new events without any available consumer.
Even if consumers are down when some events are produced, we are still guaranteed to receive them when the consumer comes back up.
Check out Genstage's guide on buffering demand for advanced buffering logic.
- Automatic distribution of work on multiple consumers.
If you have heavy consumer tasks, you can start multiple consumer processes. The default DemandDispatcher
for GenStage will distribute the work evenly across all processes.
See GenStage.Dispatcher for other dispatch strategies to distribute events to all consumers or partition distribution to consumers based on a hash function.
But, as with GenServer or synchronous implementation, using GenStage does not come without its problems.
If a consumer crashes while it is processing an event, GenStage will consider the event delivered and will not send it again when the consumer comes back up.
To make sure that you properly track crashes, you can use a monitoring service like AppSignal. AppSignal is easy to install for your Elixir app and helps you monitor performance as well as track errors. Here's an example of an error tracking dashboard that AppSignal provides:
You can set up notifications for crashes via AppSignal as well.
On the app side, you can cache such events in a persistent store once they are delivered to the consumer. If the consumer crashes, then once it recovers, it can revert to the cached events.
Be very cautious about producing too many events without enough consumers, though. While GenStage offers automatic buffering of events, this buffer has a (configurable) maximum size and a practical maximum size constrained by the server's memory.
If you don't control the frequency with which events are produced, consider using an external data store like Redis or Postgres to buffer events.
Wrap Up: Event-Driven Architecture in Elixir โ Going Beyond GenStage
In this post, we examined three approaches to implementing an event-driven system in Elixir: synchronously, using GenServer, and finally, using GenStage. We took a look at some of the advantages and disadvantages of each approach.
The simple GenStage
example can be a starting point for implementing complex event-driven data processing pipelines that span multiple nodes. I suggest you read the great GenStage documentation for more information.
If you are looking for an even higher-level abstraction, Broadway is a good starting point. It is built on top of GenStage and offers several additional features, including consuming data from external queues like Amazon SQS, Apache Kafka, and RabbitMQ.
Until next time, happy coding!
P.S. If you'd like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post!
Top comments (0)