Apache Camel K is a lightweight integration framework built from Apache Camel that runs natively on Kubernetes and is specifically designed for serverless and microservice architectures. Often referred to as the Swiss army knife for solving integration problems, it can be used to easily integrate many heterogeneous systems and applications allowing them share data seamlessly. Camel K users can instantly run integration code written in Camel DSL on their preferred cloud (Kubernetes or OpenShift). To learn more about Camel K and Camel, visit the official Camel site
Kafka can be easily integrated with the rest of your applications using Camel K. Kafka brokers support client authentication using SASL (Simple Authentication and Security Layer). SASL decouples authentication mechanisms from application protocols thus allowing us to use any of the authentication mechanisms supported by SASL. We will use the SASL supported OAUTHBEARER mechanism for authentication and SSL will handle the data encryption.
Getting started
This demonstration requires access to a kubernetes/openshift cluster.
Installing Camel K and required CLI tools
-
Camel K
: Follow the Camel K installation guide for your specific cluster -
kamel
: the Apache Camel K CLI tool. Can be downloaded from the Camel K releases page -
kubectl
: the kubernetes default CLI tool. Can be downloaded from the Kubernetes releases page
Setting up the Kafka instance
Using Openshift Streams for Apache Kafka, we can easily create a Kafka instance, a service account and a Kafka Topic. The SASL OAUTHBEARER authentication method should be selected. For this demonstration, we are going to name the Kafka Topic "test". Save the Kafka broker URL, service account ID, service account secret and token endpoint URL. We will use them in a configuration file.
Using Kafka in a Camel K integration
We will create the following files:
-
application.properties
: Configuration file, holds configuration properties -
SaslSSLKafkaProducer.java
: Producer, contains integration code and produces to the Kafka Topic -
SaslSSLKafkaConsumer.java
: Consumer, contains integration code and consumes from the Kafka Topic
Kafka uses the Java Authentication and Authorization Service (JAAS) for SASL configuration, hence we will specify the JAAS configuration in the application.properties
config file. We also have to specify a login callback handler that will retrieve the oauthbearer token.
application.properties file:
# Kafka config
camel.component.kafka.brokers = <YOUR KAFKA BROKER URL>
camel.component.kafka.security-protocol = SASL_SSL
camel.component.kafka.sasl-mechanism = OAUTHBEARER
camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.client.id='<YOUR SERVICE ACCOUNT ID>' \
oauth.client.secret='<YOUR SERVICE ACCOUNT SECRET>' \
oauth.token.endpoint.uri="<TOKEN ENDPOINT URL>" ;
camel.component.kafka.additional-properties[sasl.login.callback.handler.class] = io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
consumer.topic=test
producer.topic=test
In the integration files, we will specify kafka-oauth-client
as a maven dependency because it provides the login handler class specified in the configuration file.
SaslSSLKafkaProducer.java:
// camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.10.0
import org.apache.camel.builder.RouteBuilder;
public class SaslSSLKafkaProducer extends RouteBuilder {
@Override
public void configure() throws Exception {
log.info("About to start route: Timer -> Kafka ");
from("timer:foo")
.routeId("FromTimer2Kafka")
.setBody()
.simple("Message #${exchangeProperty.CamelTimerCounter}")
.to("kafka:{{producer.topic}}")
.log("Message correctly sent to the topic!");
}
}
SaslSSLKafkaConsumer.java:
// camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.10.0
import org.apache.camel.builder.RouteBuilder;
public class SaslSSLKafkaConsumer extends RouteBuilder {
@Override
public void configure() throws Exception {
log.info("About to start route: Kafka -> Log ");
from("kafka:{{consumer.topic}}")
.routeId("FromKafka2Log")
.log("${body}");
}
}
Bundle configuration properties into a secret:
kubectl create secret generic kafka-props --from-file application.properties
Producing to the Kafka Topic
kamel run --config secret:kafka-props SaslSSLKafkaProducer.java --dev
...
[2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic!
[2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic!
[2] 2021-05-06 08:48:11,973 INFO [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic!
Consuming from the Kafka Topic
kamel run --config secret:kafka-props SaslSSLKafkaConsumer.java --dev
...
[1] 2021-05-06 08:51:08,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8
[1] 2021-05-06 08:51:10,065 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9
[1] 2021-05-06 08:51:10,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10
That's it! Hopefully someone finds this useful and if you have any questions or comments, feel free to post in the comments section. Happy Coding!
Top comments (1)
This is real quick example...Well demonstrated..!
Quick question : Is there a way to serialize the messages while publishing to Kafka topic? Also, do we have an option to validate the schema during message producing to Kafka?
Let me know your thoughts on this.
Regards,
Guru