Hi there! I want to tell you about a great open-source tool that is AWESOME and it does not get the love it deserves: Apache Camel.
Apache Camel is an integration framework. What does that mean? Let’s suppose you are working on a project that consumes data from Kafka and RabbitMQ, reads and writes from and to various databases, transforms data, logs everything to files and outputs the processed data to another Kafka topic. You also have to implement the error handling of the service (retries, dead letter channel, etc.) for everything to run flawlessly. It seems hard.
Apache Camel helps you to integrate with many components, such as databases, files, brokers, and much more, while keeping the simplicity and promoting enterprise integration patterns. Let’s see some examples, based on integration patterns. You can find the code in this repository.
We will start by consuming events from a Kafka topic and output to another one, taking advantage of the Event-Driven Consumer pattern. The events will be representation of text messages sent by a user.
That’s about it! We also added the log for us to see the message body in the logs. The log argument is passed using the Simple language, an Apache Camel language used to evaluate expressions.
Now let’s implement a message filter. This pattern filters out the messages that do not match certain conditions. In our case, we will only process those that have the type “chat”.
Easy, right? We now unmarshal the message from JSON to the UserMessage POJO to be able to filter by type. We marshal again in JSON before sending it to another Kafka topic.
Now suppose we want to store all messages in a file. Besides, for the messages where the emitter is “John Doe”, we want to store them in a different file, for testing purposes. For that, we can use the content-based router pattern.
If the file already exists, we will append the events and add a newline at the end of each event. For other emitters, we will do the same, but stores them in another file. It does look like an ‘if’ construct, right?
We can see a list of “devices” in the event, and we want to log them one by one. How can we do that? Using the Splitter pattern, we can iterate through any list. We can do it sequentially or parallelly. Let’s try to do it sequentially in this example.
We can split by any field that is an Iterable. As you can see, we are using again the Simple language to access the content of the event.
Let’s try something harder. We are receiving messages with text from various emitters, but we want to aggregate multiple text messages and create a new message with all messages for an emitter. To do that, we can use the Aggregator pattern. The aggregator pattern allows events to be buffered and wait for other events. When another event is received, it can be performed a custom aggregation, based on our needs. A new event is sent when a condition is met. That condition can be based on the number of events received, a timeout, or any other custom condition.
In our case, we will create a new POJO that will aggregate the text messages from an emitter. The new event will be sent after 5 seconds of the first event received for the emitter.
We are using an in-memory aggregation, but we could use other data stores, such as Postgres or Redis. We are using simple language to aggregate the emitter of the message, and we created a custom aggregation strategy, shown below.
In the custom aggregation strategy, for the first event (oldExchange==null), we create a new CombinedUserMessage with the text of the message. For all other events, we add the text of the message to the combined event.
This is all great, but how do we apply transformations to a field? We now have a combined event, but what was great was if we could somehow process the combined event and turn it into plain text, by combining the multiple elements of the text messages. We can do that using the Message Translator pattern.
We can call bean functions directly from a Camel Route and perform all the transformations that we need,using plain Java code. Neat!
We can see that our Camel Routes are becoming bigger. How do we do if we want, for example, to separate them between files? Two in-memory components that allow us to do that: Direct and SEDA.
Direct is a synchronous endpoint that works like a call from a route to another route. Let’s use it to separate the route that stores the event in a file.
Great! There is another in-memory component that will be useful for us: SEDA. SEDA works like Direct but is asynchronous, which means that puts the message in a queue for other thread to process. Let’s use SEDA to decouple the receiving of the message from Kafka from the routes that consume it.
Now our routes are much simpler. Suppose we need to perform a periodic task, such as a cleanup. We can take advantage of the Timer endpoint. Let’s exemplify it by creating a route that runs every 5 seconds.
Now that our application is almost ready for production, we have to improve fault tolerance. What happens if, for some reason, a message gets an error while in a route? Let’s implement the Dead Letter pattern. When there is an error in the route, the message is sent to another Kafka topic, so that later it can be reprocessed.
And that’s it! The error handler configuration applies to all routes in the class. We send the original message to the topic (the one that was first received in the route). We could also configure retry policies, with timeouts and other common fault tolerance configurations, but as we don’t need it, we will leave it as is.
Now that we are reaching the end of this article, I also wanted to show you something: it is possible to configure REST endpoints as Camel routes.
As simple as that! We just configured a GET for the URL /api/hello, to be answered with “Hello World!”.
As you can see, Apache Camel is a framework that simplifies the integration with other components, supporting the enterprise integration patterns and making it easier to create data pipelines.
I hope you have liked it! Thank you!