DEV Community

Aditya Kanekar
Aditya Kanekar

Posted on • Edited on

Connecting to Kafka cluster using SSL with Python

This article specifically talks about how to write producer and consumer for Kafka cluster secured with SSL using Python. I won't be getting into how to generate client certificates in this article, that's the topic reserved for another article :).

Pre-Requisites

  • Kafka Cluster with SSL
  • Client certificate (KeyStore) in JKS format
  • Linux environment with keytool and openssl installed
  • Python 3.6

Step 1 - Converting JKS to PEM file

Why I need this step?

Unlike Java, Python and C# uses .pem files to connect to Kafka. For this purpose we will have to convert the JKS files to PEM with the help of keytool and openssl commands. If you are working on Windows 10 you can refer to my article on how to run WSL on Windows here.

To make your life easy I have created a shell script to quickly convert JKS to PEM.

#!/bin/bash
srcFolder=$1
keyStore=$1/$2
password=$3
alias=$4
outputFolder=$5

echo $keyStore
echo "Generating certificate.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/certificate.pem -storepass $password

echo "Generating key.pem"
keytool -v -importkeystore -srckeystore $keyStore -srcalias $alias -destkeystore $outputFolder/cert_and_key.p12 -deststoretype PKCS12 -storepass $password -srcstorepass $password
openssl pkcs12 -in $outputFolder/cert_and_key.p12 -nodes -nocerts -out $outputFolder/key.pem -passin pass:$password

echo "Generating CARoot.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/CARoot.pem -storepass $password
Enter fullscreen mode Exit fullscreen mode

The script generates following files from the keystore file,

  • key.pem
  • certificate.pem
  • CARoot.pem

How to run this script?

Save the script in a file e.g. jkstopem.sh and give execute permissions like below,

chmod +x jkstopem.sh
Enter fullscreen mode Exit fullscreen mode

To generate the PEM files. Run the shell script as shown in the below example,

./jkstopem.sh <source_path_to_jks> <keystore_file_name> <keystore_password> <alias> <output_folder>
Enter fullscreen mode Exit fullscreen mode
How to find Alias?

If you are not aware of what alias your certificate has. Run following command in the folder where you have the keystore file.

keytool -list -v -keystore kafka.client.keystore.jks
Enter fullscreen mode Exit fullscreen mode

You will be prompted to enter the password. Enter the keystore password, this will list the contents of the keystore file. You will be able to see *Alias name.

Following is the example to run the shell script,

./jkstopem.sh ~/client-cert kafka.client.keystore.jks welcome123 client-alias ~/client-cert/pem
Enter fullscreen mode Exit fullscreen mode

Now you should be able to see following files in the output folder,

  • key.pem
  • certificate.pem
  • CARoot.pem

Now as we have all the PEM files, lets get cracking.

Step 2 - Writing Kafka Producer in Python

We will be using 'kafka-python' package to connect to Kafka. You can install it using pip,

pip install kafka-python
Enter fullscreen mode Exit fullscreen mode

Now, lets write our producer.

#Producer.py
from kafka import KafkaProducer

kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoot.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
topic='test-topic'
password='welcome123'

producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
                          security_protocol='SSL',
                          ssl_check_hostname=True,
                          ssl_cafile=caRootLocation,
                          ssl_certfile=certLocation,
                          ssl_keyfile=keyLocation,
                          ssl_password=password)

producer.send(topic, bytes('Hello Kafka!','utf-8'))

# Send to a particular partition
producer.send(topic, bytes('Hello Kafka!','utf-8'),partition=0)
producer.flush()
Enter fullscreen mode Exit fullscreen mode

In the above example we are using the pem files we generated in the last step with the password to read the pem file.

kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoote.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
password='welcome123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
                          security_protocol='SSL',
                          ssl_check_hostname=True,
                          ssl_cafile=caRootLocation,
                          ssl_certfile=certLocation,
                          ssl_keyfile=keyLocation,
                          ssl_password=password)
Enter fullscreen mode Exit fullscreen mode

Sending data to random topic partition

Below code snippet will send data to random partition decided by Kafka.

producer.send(topic, bytes('Hello Kafka!','utf-8'))
producer.flush()
Enter fullscreen mode Exit fullscreen mode

Sending data to specific topic partition

To send data to a specific partition, you just need to specify the partition as shown in below code snippet,

producer.send(topic, bytes('Hello Kafka - Partition 0!','utf-8'),partition=0)
producer.flush()
Enter fullscreen mode Exit fullscreen mode

So we have built our Python producer for Kafka. In the next part we will write consumer to consume the message from the topic.

Top comments (13)

Collapse
 
bcmdevtl profile image
bcmdevtl

Hi Aditya,
Great tutorial. Did you do up the consumer part of this as well? Tried to look but couldn't find it.

Collapse
 
adityakanekar profile image
Aditya Kanekar

Glad you liked it. No I have not added the consumer part yet. But it would be similar to producer. I will try to add that.

Collapse
 
giuseppepegasus profile image
Giuseppe • Edited

Hi Aditya, I'm trying to use your code for my SSL Kafka but whene I use the producer I receive the error on kafka Prodcer command:
self._wrap_ssl() File "/home/kafka/anaconda3/envs/kafka/lib/python3.8/site-packages/kafka/conn.py", line 473, in _wrap_ssl self._ssl_context.load_verify_locations(self.config['ssl_cafile'])

It seems doesn't find the CARoot file..but it is in that folder..
Can you help me? Thank you.

Collapse
 
adityakanekar profile image
Aditya Kanekar

I think I made a typo in the code, instead of CARoot.pem it was CARoote.pem. Can you please check once.

Collapse
 
giuseppepegasus profile image
Giuseppe

Great it works....but self signed certificate are not good for the produce because I receive the error:

ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1131)

any suggestion?

Thread Thread
 
sachin_tyagi profile image
Sachin Tyagi

Hello,
I also have same issue, anyone has any solution for it.

Thanks

Collapse
 
stefan_stojanovic profile image
Stefan Stojanovic

Please double check client.keystore.jks file before executing jkstopem.sh
It is possible that the file contains 2 aliases ( one for CARoot and one for Certificate )
In that case, you might have to hardcode Certificate or CA alias in jkstopem script or add an extra arg for it. Hope it helps :)

Collapse
 
__98067b65 profile image

These lines look exactly the same

echo "Generating certificate.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/certificate.pem -storepass $password

echo "Generating CARoot.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/CARoot.pem -storepass $password

The only difference is the *pem file name (certificate.pem vs CARoot.pem)

As a result, after running the script, I got absolutely the same certificate.pem and CARoot.pem files.
Is this normal behavior?

Collapse
 
shubhendumadhukar profile image

I have a truststore.jks and password. But I don't have a keystore.jks. I am able to use the truststore and password to connect to the cluster using java code. Will this method work for truststore.jks also or is it only for keystore? I tried to generate those files and got certificate.pem but then I got an error stating "java.lang.Exception: Alias does not exist" and other files were not generated.

Collapse
 
adityakanekar profile image
Aditya Kanekar

The method will work for keystore.jks. It appears to me that you are using SSL Enabled Kafka without any ACL enabled. If thats true you might not be providing ssl_keyfile and ssl_certfile while connecting to Kafka through your Java code. In my view you should only provide the CARoot.pem to connect to Kafka other properties might not be required. Please try and let me know if that works.

Collapse
 
shailendra_jain_c0ae8daad profile image
Shailendra Jain • Edited

Hi Aditya,
While running this I get empty key.pem file. What should I do to get the populated key.pem file. Also I have multiple alias in the jks file, should I combine all the CARoot and certificate for different alias and use them while connecting.

Collapse
 
rameshwargupta97 profile image
Rameshwar Gupta

Hi Aditya,

Thanks for the detailed explanation for python version.
I'm also getting empty key.pem file. Can you please suggested something on it?

Collapse
 
code4purpose profile image
code4purpose

hi Aditya, First of all thanks for the detailed steps. It has been very helpful. I followed the exact steps. I am getting zero byte private key. My environment is centos with openssl version OpenSSL 1.0.2k-fips 26 Jan 2017. Any pointers will be highly appreciated. Thank you.