Over the course of the first two parts of this blog series, we setup a single-node Kafka cluster on Kubernetes, secured it using TLS encryption and accessed the broker using both internal and external clients. Let's keep iterating! In this post, we will continue the Kafka on Kubernetes journey with Strimzi and cover:
- How to apply different authentication types:
TLS
andSASL SCRAM-SHA-512
- Use Strimzi Entity operator to manage Kafka users and topics
- How to configure Kafka CLI and Go client applications to securely connect to the Kafka cluster
The code is available on GitHub - https://github.com/abhirockzz/kafka-kubernetes-strimzi/
What do I need to go through this tutorial?
kubectl
- https://kubernetes.io/docs/tasks/tools/install-kubectl/
I will be using Azure Kubernetes Service (AKS) to demonstrate the concepts, but by and large it is independent of the Kubernetes provider (e.g. feel free to use a local setup such as minikube
). If you want to use AKS
, all you need is a Microsoft Azure account which you can get for FREE if you don't have one already.
I will not be repeating some of the common sections (such as Installation/Setup (Helm, Strimzi, Azure Kubernetes Service), Strimzi overview) in this or subsequent part of this series and would request you to refer to part one
Create a Kafka cluster with TLS authentication
To enforce 2-way mutual TLS
auth, all we need to do is tweak the Strimzi Kafka
resource. I am highlighting the key part below. The other parts remain the same (here is the manifest from part 2) i.e. single node Kafka and Zookeeper, ephemeral storage along with TLS
encryption
external:
type: loadbalancer
tls: true
authentication:
type: tls
All we did is all the tls
authentication type as a property of the external
listener. In addition to this, we also include the entityOperator
configuration as such:
entityOperator:
userOperator: {}
topicOperator: {}
This activates the Strimzi Entity Operator
which in turn comprises of the Topic Operator
and User Operator
. Just as the Kafka
CRD allows you to control Kafka clusters on Kubernetes, a Topic Operator allows you to manage topics in a Kafka cluster through a custom resource called KafkaTopic
i.e. you can create, delete and update topics in your Kafka cluster.
The interesting part is that it's a two-way sync i.e. you can still create topics by accessing the Kafka cluster directly and it would reflect in the
KafkaTopic
resources being created/updated/deleted
The goal of the User Operator is to make Kafka user management easier with help of a KafkaUser
CRD. All you do is create instances of KafkaUser
CRDs and Strimzi takes care of the Kafka specific user management parts
Unlike Topic Operator, this is not a two-way sync
Read more about Entity Operator here https://strimzi.io/docs/operators/master/using.html#assembly-kafka-entity-operator-deployment-configuration-kafka
We will dive into the practical bit of these two operators in upcoming sections.
To create the Kafka cluster:
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/kafka-tls-auth.yaml
What did the Strimzi Operator do for us in this case?
We covered most of these in part 1 - StatefulSet
(and Pods
), LoadBalancer
Service, ConfigMap
, Secret
etc. How is the TLS
auth config enforced? To figure that out, let's introspect the Kafka server configuration
As explained in part 1, this is stored in a
ConfigMap
export CLUSTER_NAME=my-kafka-cluster
kubectl get configmap/${CLUSTER_NAME}-kafka-config -o yaml
Look at the External listener
section in server.config
:
listener.name.external-9094.ssl.client.auth=required
listener.name.external-9094.ssl.truststore.location=/tmp/kafka/clients.truststore.p12
listener.name.external-9094.ssl.truststore.password=${CERTS_STORE_PASSWORD}
listener.name.external-9094.ssl.truststore.type=PKCS12
The snippet highlighted above is the part which was added - notice listener.name.external-9094.ssl.client.auth=required
was added along with the truststore details.
Let's not forget the Entity Operator
The Entity Operator runs a separate Deployment
export CLUSTER_NAME=my-kafka-cluster
kubectl get deployment $CLUSTER_NAME-entity-operator
kubectl get pod -l=app.kubernetes.io/name=entity-operator
NAME READY STATUS
my-kafka-cluster-entity-operator-666f8758f6-gj54h 3/3 Running
The entity operator Pod
runs three containers - topic-operator, user-operator, tls-sidecar
We have configured our cluster to authenticate client connections, but what about the user credentials which will be used by client apps?
Time to use the User Operator!
The User Operator allows us to create KafkaUser
s to represent client authentication credentials. As mentioned in the beginning of the blog post, supported authentication types include TLS
and SCRAM-SHA-512
. Behind the scenes, a Kubernetes Secret
is created by Strimzi to store the credentials
OAuth 2.0 is also supported but its not handled by the User Operator
Let's create a KafkaUser
to store client credentials for TLS auth. Here is what the user info looks like:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: kafka-tls-client-credentials
labels:
strimzi.io/cluster: my-kafka-cluster
spec:
authentication:
type: tls
We name the user kafka-tls-client-credentials
, associate with the Kafka cluster we created earlier (using the label strimzi.io/cluster: my-kafka-cluster
) and specify the tls
authentication type
You can also define authorization rules (not covered in this blog) within a
KafkaUser
definition - see https://strimzi.io/docs/operators/master/using.html#type-KafkaUser-reference
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/user-tls-auth.yaml
Introspect the Secret
(it has the same name as the KafkaUser
):
kubectl get secret/kafka-tls-client-credentials -o yaml
TLS client authentication
That's it! Now its up to the client to use the credentials. We will use a Kafka CLI and Go client application to try this out. First things first:
Extract and configure the user credentials
export KAFKA_USER_NAME=kafka-tls-client-credentials
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.crt}' | base64 --decode > user.crt
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.key}' | base64 --decode > user.key
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.p12}' | base64 --decode > user.p12
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.password}' | base64 --decode > user.password
Import the entry in user.p12
into another keystore
export USER_P12_FILE_PATH=user.p12
export USER_KEY_PASSWORD_FILE_PATH=user.password
export KEYSTORE_NAME=kafka-auth-keystore.jks
export KEYSTORE_PASSWORD=foobar
export PASSWORD=`cat $USER_KEY_PASSWORD_FILE_PATH`
sudo keytool -importkeystore -deststorepass $KEYSTORE_PASSWORD -destkeystore $KEYSTORE_NAME -srckeystore $USER_P12_FILE_PATH -srcstorepass $PASSWORD -srcstoretype PKCS12
sudo keytool -list -alias $KAFKA_USER_NAME -keystore $KEYSTORE_NAME
Just like we did in part 2, TLS encryption config requires importing the cluster CA cert in the client truststore
Extract and configure server CA cert
Extract the cluster CA certificate and password
export CLUSTER_NAME=my-kafka-cluster
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password
Import it into truststore
- I am using the built-in truststore which comes in with a JDK (Java) installation - but this is just for convenience and you're free to use other truststore
export CERT_FILE_PATH=ca.crt
export CERT_PASSWORD_FILE_PATH=ca.password
# replace this with the path to your truststore
export KEYSTORE_LOCATION=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/lib/security/cacerts
export PASSWORD=`cat $CERT_PASSWORD_FILE_PATH`
# you will prompted for the truststore password. for JDK truststore, the default password is "changeit"
# Type yes in response to the 'Trust this certificate? [no]:' prompt
sudo keytool -importcert -alias strimzi-kafka-cert -file $CERT_FILE_PATH -keystore $KEYSTORE_LOCATION -keypass $PASSWORD
sudo keytool -list -alias strimzi-kafka-cert -keystore $KEYSTORE_LOCATION
You should now be able to authenticate to the Kafka cluster using the Kafka CLI client
Please note that the configuration steps for the Kafka CLI as detailed below will also work for the Java clients as well - feel free to try that out as well
Create properties file for Kafka CLI clients
Extract the LoadBalancer
public IP for Kafka cluster
export KAFKA_CLUSTER_NAME=my-kafka-cluster
kubectl get service/${KAFKA_CLUSTER_NAME}-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}
Create a file called client-ssl-auth.properties
with the following contents:
bootstrap.servers=[LOADBALANCER_PUBLIC_IP]:9094
security.protocol=SSL
ssl.truststore.location=[TRUSTSTORE_LOCATION]
ssl.truststore.password=changeit
ssl.keystore.location=kafka-auth-keystore.jks
ssl.keystore.password=foobar
ssl.key.password=[contents of user.password file]
changeit
is the default truststore password. Please use a different one if needed
Download Kafka if you don't have it already - https://kafka.apache.org/downloads
One last thing before you proceed
Create a KafkaTopic
As I mentioned earlier, the Topic Operator makes this possible to embed topic info in form of a KafkaTopic
manifest as such:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: strimzi-test-topic
labels:
strimzi.io/cluster: my-kafka-cluster
spec:
partitions: 3
replicas: 1
To create the topic:
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/topic.yaml
Here is the reference for a
KafkaTopic
CRD https://strimzi.io/docs/operators/master/using.html#type-KafkaTopic-reference
All you need to do is use the kafka-console-producer
and kafka-console-consumer
by pointing it to the client-ssl-auth.properties
file you just created
export KAFKA_HOME=[replace with kafka installation] e.g. /Users/foobar/kafka_2.12-2.3.0
export LOADBALANCER_PUBLIC_IP=[replace with public-ip]
export TOPIC_NAME=strimzi-test-topic
# on a terminal, start producer and send a few messages
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --producer.config client-ssl-auth.properties
# on another terminal, start consumer
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --consumer.config client-ssl-auth.properties --from-beginning
You should see producer and consumer working in tandem. Great!
If you face SSL Handshake errors, please check whether keys and certificates has been correctly imported and you're using the correct password. If the Kafka cluster is not reachable, ensure you are using the right value for the public IP
Now, let's try a programmatic client. Since the Java client behavior (required config properties) are same as the CLI, I am using a Go client to try something different. Don't worry, if you are not a Go programmer, it should be easy to follow along.
I will not walk through the entire program, just the part where we create the connection related configuration. Here is the snippet:
bootstrapServers = os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
caLocation = os.Getenv("CA_CERT_LOCATION")
topic = os.Getenv("KAFKA_TOPIC")
userCertLocation = os.Getenv("USER_CERT_LOCATION")
userKeyLocation = os.Getenv("USER_KEY_LOCATION")
userKeyPassword = os.Getenv("USER_KEY_PASSWORD")
producerConfig := &kafka.ConfigMap{"bootstrap.servers": bootstrapServers, "security.protocol": "SSL", "ssl.ca.location": caLocation, "ssl.certificate.location": userCertLocation, "ssl.key.location": userKeyLocation, "ssl.key.password": userKeyPassword}
Notice that the bootstrap.servers
and security.protocol
are the same as ones you used in the Kafka CLI client (same for Java as well).
- For TLS encryption:
ssl.ca.location
is used to point to the CA certificate directly as opposed to a truststore - For client authentication:
ssl.certificate.location
,ssl.key.location
andssl.key.password
refer to the user certificate, user key and password respectively
If you have Go
installed, you can try it out. Clone the Git repo
git clone https://github.com/abhirockzz/kafka-kubernetes-strimzi
cd part-3/go-client-app
.. and run the program:
export KAFKA_BOOTSTRAP_SERVERS=[replace with public-ip:9094] e.g. 20.43.176.7:9094
export CA_CERT_LOCATION=[replace with location of ca.crt file] e.g. /Users/code/kafka-kubernetes-strimzi/part-3/ca.crt
export KAFKA_TOPIC=test-strimzi-topic
export USER_CERT_LOCATION=[path to user.crt file] e.g. /Users/code/kafka-kubernetes-strimzi/part-3/user.crt
export USER_KEY_LOCATION=[path to user.key file] e.g. /Users/code/kafka-kubernetes-strimzi/part-3/user.key
export USER_KEY_PASSWORD=[contents of user.password file]
go run kafka-tls-auth-client.go
The logs should confirm whether messages are being produced and consumed
Enforce SCRAM-SHA-512 auth
SCRAM
stands for "Salted Challenge Response Authentication Mechanism". I will not pretend to be a security or SCRAM
expert, but do want to highlight that it is one of the supported and commonly used authentication mechanism in Kafka (in addition to other such as PLAIN
)
Please note that Strimzi does not support
SASL PLAIN
auth at the time of writing
Update the Kafka cluster
To apply the SCRAM
authentication scheme - all you need is to set the authentication.type
to scram-sha-512
external:
type: loadbalancer
tls: true
authentication:
type: scram-sha-512
Update the Kafka cluster to use SCRAM-SHA
authentication
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/kafka-tls-auth.yaml
Let's take a look at how the Kafka server config looks like in this case:
export CLUSTER_NAME=my-kafka-cluster
kubectl get configmap/${CLUSTER_NAME}-kafka-config -o yaml
Introspect External listener
section in server.config
and notice how the the config has been updated to reflect
listener.name.external-9094.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required;
listener.name.external-9094.sasl.enabled.mechanisms=SCRAM-SHA-512
Create SCRAM credentials (KafkaUser
)
Just like we did with TLS
auth, we need to create client credentials for SCRAM
as well. It only differs from its TLS equivalent in terms of name and the type (of course!)
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: kafka-scram-client-credentials
labels:
strimzi.io/cluster: my-kafka-cluster
spec:
authentication:
type: scram-sha-512
notice that
authentication.type
isscram-sha-512
Create the KafkaUser
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/user-scram-auth.yaml
Introspect the Secret
(it has the same name as the KafkaUser
):
kubectl get secret/kafka-scram-client-credentials -o yaml
The Secret
contains the password
in base64
encoded form
apiVersion: v1
kind: Secret
name: kafka-scram-client-credentials
data:
password: SnpteEQwek1DNkdi
...
Username is same as the
KafkaUser
/Secret
name, which iskafka-scram-client-credentials
in this example
Run client applications
In order run the client examples, download the the password:
export USER_NAME=kafka-scram-client-credentials
kubectl get secret $USER_NAME -o jsonpath='{.data.password}' | base64 --decode > user-scram.password
To test the Kafka CLI client, create a file client-scram-auth.properties
with the following contents:
bootstrap.servers=[replace with public-ip:9094]
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=[replace with path to truststore with kafka CA cert]
# "changeit" is the default password for JDK truststore, please use the one applicable to yours
ssl.truststore.password=changeit
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-scram-client-credentials" password="[replace with contents of user-scram.password file]";
Refer to the instructions above to run the console producer and consumer
please make sure you use the
client-scram-auth.properties
and not theclient-tls-auth.properties
file
Before wrapping up, lets look at the Go client and see how it handles SCRAM
authentication. As always, I will only highlight the part which showcases the configuration:
bootstrapServers = os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
caLocation = os.Getenv("CA_CERT_LOCATION")
topic = os.Getenv("KAFKA_TOPIC")
kafkaScramUsername = os.Getenv("SCRAM_USERNAME")
kafkaScramPassword = os.Getenv("SCRAM_PASSWORD")
producerConfig := &kafka.ConfigMap{"bootstrap.servers": bootstrapServers, "security.protocol": "SASL_SSL", "ssl.ca.location": caLocation, "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": kafkaScramUsername, "sasl.password": kafkaScramPassword}
The security.protocol
and sasl.mechanism
have been updated to SASL_SSL
and SCRAM-SHA-512
respectively. Along with that, we use the sasl.username
and sasl.password
to specify the client credentials
To run the Go
client app:
export KAFKA_BOOTSTRAP_SERVERS=[replace with public-ip:9094]
export CA_CERT_LOCATION=[path to ca.crt file] f.g. /Users/code/kafka-kubernetes-strimzi/part-3/ca.crt
export KAFKA_TOPIC=strimzi-test-topic
export SCRAM_USERNAME=kafka-scram-client-credentials
export SCRAM_PASSWORD=[contents of user-scram.password file]
go run kafka-scram-auth-client.go
Wrap up.. for now
This post covered a decent amount of ground! We learnt how to apply different authentication types, use Entity Operators to manage Kafka users and topics and more importantly, understand how client applications need to configured to connect securely using a combination of TLS encryption and the chosen authentication scheme.
We're far from done! All this while, we've been creating ephemeral
clusters with no persistence - we will fix that in upcoming posts.
Top comments (2)
Thank you very much, the publications are really good, I can't wait to see the next ones.
What are the configurations that I need to do for internal consumer. Like *.properties, should we have something ?