DEV Community

Cover image for Using Oracle Streaming service as a managed Apache Kafka
Mirek Sedzinski
Mirek Sedzinski

Posted on

Using Oracle Streaming service as a managed Apache Kafka

The Oracle Cloud Infrastructure Streaming is a fully managed and scalable service for processing high-volume streams of data in real time.
We can think of it as an equivalent of Apache Kafka and actually - they are API-compatibile, which means we can use applications written for Kafka with Streaming without having to rewrite a code.

Problem statement

Why we would even consider using Streaming service? Well, for the same reasons we would consider using Kafka.

The additional bonus is that because it's a PaaS service, it's extremely easy to set up and there's almost no effort related to maintenance (forget about patching, scaling, configuring HA or running out of disc space).

How to use it

Configuration can be done in a standard way - manually in the cloud console or automatically with REST API/SDKs/Terraform/CLI.

Things are getting more interesting when it comes to publishing/consuming messages. There are two approaches:

  2. Kafka-compliant API

Usually, I tend to go with the latter, for one reason: long-lived TCP connections used by Kafka protocol. Using this mechanism, makes the process of reading the data much more interactive - one can listen certain amount of time on a topic and when a message arrives it is consumed at once. In case of OCI REST API, the only way is to call GetMessages operation in a loop and it exits immediately when there is nothing to consume.

In the next section I will show how easy is to configure Streaming and connect to it using Kafka client.


Stream Pool

First, let's create stream pool which groups streams (and the easiest way to think about a stream is to treat it as a Kafka topic):

Stream Pool

The only required parameter that has to be provided is actually a Stream Pool Name. For other settings we can use default values.

However, there are two interesting configuration parameters to mention:

  1. Endpoint type - we can choose whether our streams are public (ingress traffic allowed from the internet) or private (accessible only within Oracle Cloud region).
  2. Encryption settings - all messages are encrypted by default (at rest and in-transit). We can specify details of the configuration.


Now we can create a Stream (aka topic):

Alt Text

First we need to assign a Stream to a Stream Pool (the one we created in the previous section). Then we need to provide the name of the Stream.

We can control read/write throughput using mechanism of partitions (works the same as in Kafka).

Kafka configuration

When we open a page with details of any given Stream Pool, at the top of the screen we will find the following button:
Alt Text

After clicking it, the page with connection details pops up:
Alt Text

Having all that data we are almost ready to connect to Streaming service using Kafka client. But first we need to touch on one very important topic - security.

A few important words about security


Connection between client and server is encrypted in transit using TSL (successor of SSL). And there is a 1-way authentication enabled by default, which means that client authenticates the server certificate.

To make that authentication work, client has to trust the certificate presented by the server. Usually it requires additional configuration.

There is a setting that can be used on client side: "". It can be used to point to the file with proper certificate chain of trust. The file itself should look like this:

# Server certificate

# Trust chain intermediate certificate
----------BEGIN CERTIFICATE-----
----------END CERTIFICATE-----

# Trust chain root certificate
----------BEGIN CERTIFICATE-----
----------END CERTIFICATE-----
Enter fullscreen mode Exit fullscreen mode

How to build the chain of trust? We start at the bottom of the chain ("Server certificate" in the example above, but please mind that direction in the file is reversed).

To get the server certificate we can run the following command:

echo -n | openssl s_client -connect <endpoint taken from Stream details page>
Enter fullscreen mode Exit fullscreen mode

It will return two very important pieces of information:

  • "Certificate chain" section which lists certificates. All of them should be placed in our file. In my case it looks like this (there are 3 certificates in the chain):
Certificate chain
 0 s:/C=US/ST=California/L=Redwood City/O=Oracle Corporation/OU=Oracle OCI-PROD FRANKFURT/
   i:/C=US/O=DigiCert Inc/CN=DigiCert SHA2 Secure Server CA
 1 s:/C=US/O=DigiCert Inc/CN=DigiCert SHA2 Secure Server CA
   i:/C=US/O=DigiCert Inc/ Global Root CA
 2 s:/C=US/O=DigiCert Inc/ Global Root CA
   i:/C=US/O=DigiCert Inc/ Global Root CA
Enter fullscreen mode Exit fullscreen mode
  • "Server certificate" - section between BEGIN/END CERTIFICATE lines. This is a certificate number "0" in a "Certificate chain" and should be placed at the top of our file.

The last step is to collect remaining certificates ("1" and "2") and place them in the file. The root certificate goes at the bottom.

For testing purposes we can use quick and dirty workaround. Setting "enable.ssl.certificate.verification" to "false" will disable server authentication altogether.

Authentication and authorisation

It's highly recommended to create a dedicated user for each Stream Pool (or set of Stream Pools, depending on the requirements).
After creating such a user, we have to:

  • generate for him Auth Token, which will be used as a password.
  • grant him appropriate level of access using OCI Policies

Kafka client

Finally, let's try to connect to our Streaming service using Kafka Client. In my case I will use Go and Confluent package.
Example of producer and consumer code that we can use is available here:

Consumer configuration

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
        //taken from Stream Pool details page
        "bootstrap.servers":                   "",
        //arbitrary value, when using consumer groups
        "":                            "foo",
        //taken from Stream Pool details page
        "sasl.mechanisms":                     "PLAIN",
        //user authorized to read from the  stream(s) 
        "sasl.username":                       "[tenancy]/[user name]/[stream pool id]",
        //Auth Token for the user
        "sasl.password":                       "[token]",
        "enable.ssl.certificate.verification": "true",
        //Full path to the file with certificates that make up chain of trust
        "":                     "/dir/ca.pem",
        //taken from Stream Pool details page
        "security.protocol":                   "SASL_SSL",
        "auto.offset.reset":                   "earliest"})  

//subscribe to given Stream (aka topic)
err = consumer.SubscribeTopics([]string{"testStream"}, nil)
Enter fullscreen mode Exit fullscreen mode

Producer configuration

p, err := kafka.NewProducer(&kafka.ConfigMap{
        //taken from Stream Pool details page
        "bootstrap.servers":                   "",
        //taken from Stream Pool details page
        "sasl.mechanisms":                     "PLAIN",
        //user authorized to write to the  stream(s) 
        "sasl.username":                       "[tenancy]/[user name]/[stream pool id]",
        //Auth Token for the user
        "sasl.password":                       "[token]",
        //example of how to disable server certificate validation
        //don't do it in production!
        "enable.ssl.certificate.verification": "false",
        //taken from Stream Pool details page
        "security.protocol":                   "SASL_SSL"})
Enter fullscreen mode Exit fullscreen mode

Top comments (0)