DEV Community

Kuthumi Pepple
Kuthumi Pepple

Posted on • Edited on

Authenticating Camel K to Kafka using SASL

Apache Camel K is a lightweight integration framework built from Apache Camel that runs natively on Kubernetes and is specifically designed for serverless and microservice architectures. Often referred to as the Swiss army knife for solving integration problems, it can be used to easily integrate many heterogeneous systems and applications allowing them share data seamlessly. Camel K users can instantly run integration code written in Camel DSL on their preferred cloud (Kubernetes or OpenShift). To learn more about Camel K and Camel, visit the official Camel site

Kafka can be easily integrated with the rest of your applications using Camel K. Kafka brokers support client authentication using SASL (Simple Authentication and Security Layer). SASL decouples authentication mechanisms from application protocols thus allowing us to use any of the authentication mechanisms supported by SASL. We will use the SASL supported OAUTHBEARER mechanism for authentication and SSL will handle the data encryption.

Getting started

This demonstration requires access to a kubernetes/openshift cluster.

Installing Camel K and required CLI tools

Setting up the Kafka instance

Using Openshift Streams for Apache Kafka, we can easily create a Kafka instance, a service account and a Kafka Topic. The SASL OAUTHBEARER authentication method should be selected. For this demonstration, we are going to name the Kafka Topic "test". Save the Kafka broker URL, service account ID, service account secret and token endpoint URL. We will use them in a configuration file.

Using Kafka in a Camel K integration

We will create the following files:

  • application.properties: Configuration file, holds configuration properties
  • SaslSSLKafkaProducer.java: Producer, contains integration code and produces to the Kafka Topic
  • SaslSSLKafkaConsumer.java: Consumer, contains integration code and consumes from the Kafka Topic

Kafka uses the Java Authentication and Authorization Service (JAAS) for SASL configuration, hence we will specify the JAAS configuration in the application.properties config file. We also have to specify a login callback handler that will retrieve the oauthbearer token.

application.properties file:

# Kafka config
camel.component.kafka.brokers = <YOUR KAFKA BROKER URL>
camel.component.kafka.security-protocol = SASL_SSL

camel.component.kafka.sasl-mechanism = OAUTHBEARER
camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.client.id='<YOUR SERVICE ACCOUNT ID>' \
        oauth.client.secret='<YOUR SERVICE ACCOUNT SECRET>' \
        oauth.token.endpoint.uri="<TOKEN ENDPOINT URL>" ;
camel.component.kafka.additional-properties[sasl.login.callback.handler.class] = io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

consumer.topic=test
producer.topic=test
Enter fullscreen mode Exit fullscreen mode

In the integration files, we will specify kafka-oauth-client as a maven dependency because it provides the login handler class specified in the configuration file.

SaslSSLKafkaProducer.java:

// camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.10.0

import org.apache.camel.builder.RouteBuilder;

public class SaslSSLKafkaProducer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Timer -> Kafka ");
    from("timer:foo")
        .routeId("FromTimer2Kafka")
        .setBody()
            .simple("Message #${exchangeProperty.CamelTimerCounter}")
        .to("kafka:{{producer.topic}}")
        .log("Message correctly sent to the topic!");
  }
}
Enter fullscreen mode Exit fullscreen mode

SaslSSLKafkaConsumer.java:

// camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.10.0

import org.apache.camel.builder.RouteBuilder;

public class SaslSSLKafkaConsumer extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    log.info("About to start route: Kafka -> Log ");
    from("kafka:{{consumer.topic}}")
        .routeId("FromKafka2Log")
        .log("${body}");
  }
}
Enter fullscreen mode Exit fullscreen mode

Bundle configuration properties into a secret:

kubectl create secret generic kafka-props --from-file application.properties
Enter fullscreen mode Exit fullscreen mode

Producing to the Kafka Topic

kamel run --config secret:kafka-props SaslSSLKafkaProducer.java --dev
...
[2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic!
[2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic!
[2] 2021-05-06 08:48:11,973 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic!
Enter fullscreen mode Exit fullscreen mode

Consuming from the Kafka Topic

kamel run --config secret:kafka-props SaslSSLKafkaConsumer.java --dev
...
[1] 2021-05-06 08:51:08,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8
[1] 2021-05-06 08:51:10,065 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9
[1] 2021-05-06 08:51:10,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10
Enter fullscreen mode Exit fullscreen mode

That's it! Hopefully someone finds this useful and if you have any questions or comments, feel free to post in the comments section. Happy Coding!

Top comments (1)

Collapse
 
gtata007 profile image
gtata007

This is real quick example...Well demonstrated..!

Quick question : Is there a way to serialize the messages while publishing to Kafka topic? Also, do we have an option to validate the schema during message producing to Kafka?
Let me know your thoughts on this.

Regards,
Guru