DEV Community

Cover image for OpenID Connect authentication with Apache Kafka 3.1
Brice LEPORINI
Brice LEPORINI

Posted on

OpenID Connect authentication with Apache Kafka 3.1

Dear reader, this is not going to be fun because today we're talking about security. However, to make it less boring, this is about taking advantage of the support of OpenID Connect (OIDC) in Kafka 3.1, the foundation of Confluent Platform 7.1.

OpenID Connect

Let's start with a few words about OIDC. It's an open standard that completes OAuth2.0. The aim of this paper is not to get a proper introduction to OIDC, but let's emphasize some key differences with OAuth2.0.

First of all, in OAuth2.0, the token is nothing more than an opaque string that has to be verified against the authorization server to be trusted. OIDC uses JSON Web Token. It's a signed JSON document, base64 encoded. The cool thing is, as it's signed, applications can trust it without requiring any requests to the authorization server, so it implies only processing resources, which scales way better than point-to-point connections. The only element the application (here the application is a Kafka broker) needs is the public key for validating the token, and it's published by the authorization server, with another open specification, JWKS, and is easily cacheable.

Obviously, this is an extremely incomplete summary of OIDC. What I like about it is that it frees the application from authentication method complexity. With OIDC, the organization can opt for simple user/password authentication, MFA, biometrics, SSO, multiple authentication flows and other options: it has no impact on the application as long as it complies with the standard.

Putting it together with Kafka

Here, we're keeping it simple as the use case is to make an application to application authentication. So I'm using only client id and client secret. In order to make it as light as possible, the authorization server is Auth0, a fully managed service with a free tier. To set it up, I recommend reading the section Backend/API of the documentation. Kafka is part of the listed backend, but the Configure Auth0 APIs paragraph of any kind of backend fits for this PoC. You can feel free to opt for any other authentication provider as the standard is open and there are multiple implementation alternatives, self-managed as well as as-a-service.

The support of OIDC in Kafka 3.1 is an extension of an existing feature and is defined in KIP-768. The authentication flow is pretty simple:

Image description

During startup, the broker collects the public key set from the authorization server. The client starts by authenticating against the authorization server, then the latter issues a JWT. This token is then used for the SASL/OAUTHBEARER authentication. The broker now validates the token by verifying the signature and claims to clear the client.

To make it more fun, I'm using Kafka in KRaft mode (so without Zookeeper) based on this example running in Docker provided by Confluent.

The first step is to validate the Auth0 setup, and Kafka comes with a handy command line tool:

$ docker run -ti --rm confluentinc/cp-kafka:7.1.0 kafka-run-class org.apache.kafka.tools.OAuthCompatibilityTool \
  --clientId XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX \
  --clientSecret XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX \
  --sasl.oauthbearer.jwks.endpoint.url https://xxxx-xxxxx.us.auth0.com/.well-known/jwks.json \
  --sasl.oauthbearer.token.endpoint.url https://xxxx-xxxxx.us.auth0.com/oauth/token \
  --sasl.oauthbearer.expected.audience https://kafka.auth

PASSED 1/5: client configuration
PASSED 2/5: client JWT retrieval
PASSED 3/5: client JWT validation
PASSED 4/5: broker configuration
PASSED 5/5: broker JWT validation
SUCCESS
Enter fullscreen mode Exit fullscreen mode

All configurations come from the authentication provider. Any other kind of output would require you to have a look at the Auth0 configuration.
Now let's tweak the broker configuration:

version: '2'
services:

  broker:
    image: confluentinc/cp-kafka:7.1.0
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OIDC:SASL_PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,OIDC://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,OIDC://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
      KAFKA_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: $JWKS_ENDPOINT_URL
      KAFKA_OPTS: -Djava.security.auth.login.config=/tmp/kafka_server_jaas.conf
      KAFKA_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: $OIDC_AUD
      KAFKA_LISTENER_NAME_OIDC_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
    volumes:
      - ./update_run.sh:/tmp/update_run.sh
      - ./kafka_server_jaas.conf:/tmp/kafka_server_jaas.conf
      - ./client.properties:/tmp/client.properties
    command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
Enter fullscreen mode Exit fullscreen mode

Here are the differences with the original example:

$ diff compose.yml compose.ori.yml
14,15c14,15
<       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OIDC:SASL_PLAINTEXT'
<       KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,OIDC://localhost:9092'
---
>       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
>       KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
25c25
<       KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,OIDC://0.0.0.0:9092'
---
>       KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
29,33d28
<       KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
<       KAFKA_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: $JWKS_ENDPOINT_URL
<       KAFKA_OPTS: -Djava.security.auth.login.config=/tmp/kafka_server_jaas.conf
<       KAFKA_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: $OIDC_AUD
<       KAFKA_LISTENER_NAME_OIDC_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
36,37d30
<       - ./kafka_server_jaas.conf:/tmp/kafka_server_jaas.conf
<       - ./client.properties:/tmp/client.properties
Enter fullscreen mode Exit fullscreen mode

Long story short, the external listener has been renamed and configured to use SASL_PLAINTEXT with the OAUTHBEARER mechanism. Notice that the coordinates of the authorization service are provided with environment variables in order to keep it generic.

The JAAS configuration is pretty basic:

KafkaServer {
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
};
Enter fullscreen mode Exit fullscreen mode

Now let’s start the broker:

$ JWKS_ENDPOINT_URL=https://xxxx-xxxxx.us.auth0.com/.well-known/jwks.json OIDC_AUD=https://kafka.auth compose up
[+] Running 1/1
 ⠿ Container broker  Created                       0.1s
Attaching to broker
broker  | ===> User
broker  | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
broker  | ===> Configuring ...
broker  | ===> Running preflight checks ...
broker  | ===> Check if /var/lib/kafka/data is writable ...
broker  | ===> Check if Zookeeper is healthy ...
broker  | ignore zk-ready  40
broker  | Formatting /tmp/kraft-combined-logs
broker  | ===> Launching ...
broker  | ===> Launching kafka ...
[...]
broker  | [2022-04-15 06:35:13,095] INFO KafkaConfig values:
broker  |   advertised.listeners = PLAINTEXT://broker:29092,OIDC://localhost:9092
[...]
broker  |   listener.security.protocol.map = CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OIDC:SASL_PLAINTEXT
broker  |   listeners = PLAINTEXT://broker:29092,CONTROLLER://broker:29093,OIDC://0.0.0.0:9092
[...]
broker  |   sasl.enabled.mechanisms = [OAUTHBEARER]
[...]
broker  |   sasl.oauthbearer.expected.audience = [https://kafka.auth]
[...]
broker  |   sasl.oauthbearer.jwks.endpoint.url = https://xxxx-xxxxx.us.auth0.com/.well-known/jwks.json
[...]
broker  | [2022-04-15 06:35:13,159] INFO [BrokerLifecycleManager id=1] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. (kafka.server.BrokerLifecycleManager)
Enter fullscreen mode Exit fullscreen mode

Good stuff! Next, let's configure the client:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.login.connect.timeout.ms=15000
sasl.oauthbearer.token.endpoint.url=https://xxxx-xxxxx.us.auth0.com/oauth/token
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" \
clientSecret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" ;
Enter fullscreen mode Exit fullscreen mode

We're now good to go with some basic produce and consume tests. You may have noticed that I also mounted the client configuration file in the broker container, it's pure convenience to run the clients in the same container:

$ docker exec -ti broker kafka-console-producer --producer.config /tmp/client.properties --bootstrap-server localhost:9092 --topic test
>Hello OIDC!
>%
Enter fullscreen mode Exit fullscreen mode

And in a different terminal:

$ docker exec -ti broker kafka-console-consumer --consumer.config /tmp/client.properties --bootstrap-server localhost:9092 --topic test
Hello OIDC!
^CProcessed a total of 1 messages
Enter fullscreen mode Exit fullscreen mode

That's it!

Running the client without the proper configuration raises errors on both sides, testifying that the broker is rejecting it as expected:

$ docker exec -ti broker kafka-console-consumer --bootstrap-server localhost:9092 --topic test
[2022-04-15 06:57:28,564] WARN [Consumer clientId=console-consumer, groupId=console-consumer-9357] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

broker  | [2022-04-15 06:57:28,247] INFO [SocketServer listenerType=BROKER, nodeId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
Enter fullscreen mode Exit fullscreen mode

If you increase the level of the org.apache.kafka.common.security logger, you'll be able to see the token parsed.

For reference, I also recommend reading the SASL/OAUTHBEARER documentation.

Top comments (0)