DEV Community

Cover image for Streaming Analytics With Confluent Platform and Debezium
Ceyhun Kerti
Ceyhun Kerti

Posted on

Streaming Analytics With Confluent Platform and Debezium

This article is for those who wants to implement a real time analytics platform, but have the data stored initially in database. There are many documents on web describing, building a streaming platform, but most of them assume that the data is being ingested to Kafka in the first place. Unfortunately that's not the case for some of us.
Our example architectural scenario is like this;

Example Scenario

Our data is initially stored in Oracle database, in other words application layer persists the user transactions directly to database instead of sending them to a queue like Kafka. So how do we convert the static data to a stream? Lucky for us there are some methods that utilizes the CDC(change data capture) logic. Since in our example scenario the database is Oracle we will only talk about the options for Oracle. Hopefully there are even more options for other databases for capturing real time data. 

Currently there are two options for capturing data from Oracle database and the third one is on the way that we will talk about briefly later in this article.

GoldenGate

Oracle had acquired GoldenGate back in 2009 and positioned the software in their fusion middleware tier. Basically GoldenGate is a cdc tool that reads the oracle redo logs and replicates the data. Since it is reading from the redo logs it has no extra load on the database. Before GG some of the developers were performing trigger operations on the source systems and replicating the data over dblink to another oracle database, as expected this method had a serious drawbacks including performance, security and stability. A couple of years ago Oracle has released Oracle GoldenGate Big Data Adapter which you can configure a Kafka adapter and integrate GG with Kafka. That being said this adapter is tightly coupled with their BigData Appliance platform.

XStream API

XStream API is released as a new feature with Oracle database 11g Release 2 (11.2). Since then this feature is included in every other successor versions. XStream API basically consists of oracle database components, such as inbound and outbound servers, and an application programming interface API. This interface enable client applications to receive data changes from an Oracle database and send data changes to an Oracle database. Received changes can be shared with other other systems, including non Oracle products, file systems or third party software applications of any kind.

So which one should we use? The answer depends on your current architecture. If you are using Oracle BDA you may consider using GG Big Data adapter. Otherwise you may check out XStream API and learn how it works. One important thing should be noted here is; sadly both of the approaches require Oracle GoldenGate licence. Yes you read it correct, even if you use XStream you should have a GG licence.

Debezium

Alright lets get back to our sample scenario and ask ourselves; i don't have BDA and don't want to install GG, so which one should i use? Actually we do not use either of them directly. To come to our rescue, there is an open source RedHat Debezium platform for change data capture from various databases including Oracle. Debezium has a plugin for Oracle database which uses XStream API and simplifies many things for us. IMHO it is much more simpler then setting up GG and integrating with Kafka. But remember that, since it uses XStream API, you should have GG licence in any case. As we said before there is an upcoming third option ,aside from GG and XStream Api, inside Debezium which utilizes Oracle LogMiner that does not require a Oracle licence. This feature is being implemented as of writing this article, you may check github repo and gitter channel for more information. Especially i recommend gitter channel for any kind of issues and questions for Debezium. You can contact directly to product owners and developers, who are very friendly and helpful.

Integration

So let's get start to integrate Oracle, Debezium and Confluent. First of all we need to prepare our database so that we can receive change records. 

Enable GoldenGate replication and archive log mode, you may need to change below paths according to your installation.

sqlplus sys as sysdba
alter system set db_recovery_file_dest_size = 5G;
mkdir -p /u01/oracle/oradata/recovery_area
alter system set db_recovery_file_dest = '/u01/oracle/oradata/recovery_area' scope=spfile;
alter system set ENABLE_GOLDENGATE_REPLICATION=true;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
--Should show "Database log mode: Archive Mode"
archive log list
Enter fullscreen mode Exit fullscreen mode

Create xstrm and xstrmadmin users for managing connections to oracle.

CREATE TABLESPACE xstream_adm_tbs DATAFILE '/u01/oracle/oradata/xstream_adm_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

CREATE USER xstrmadmin IDENTIFIED BY xsa
DEFAULT TABLESPACE xstream_adm_tbs
QUOTA UNLIMITED ON xstream_adm_tbs;

GRANT CREATE SESSION TO xstrmadmin;

CREATE TABLESPACE xstream_tbs DATAFILE '/u01/oracle/oradata/xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

CREATE USER xstrm IDENTIFIED BY xs DEFAULT TABLESPACE xstream_tbs QUOTA UNLIMITED ON xstream_tbs;


GRANT CREATE SESSION TO xstrm;
GRANT SELECT ON V_$DATABASE to xstrm;
GRANT FLASHBACK ANY TABLE TO xstrm;


BEGIN
   DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
      grantee                 => 'xstrmadmin',
      privilege_type          => 'CAPTURE',
      grant_select_privileges => TRUE
   );
END;
Enter fullscreen mode Exit fullscreen mode

Create oracle outbound server with the tables you want to track.

DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
  schemas(1) := 'my_schema';
  tables(1)  := 'ABC_CUSTOMER_PROPOSAL';
  tables(2)  := 'ABC_INPUT_PARSE';
  tables(3)  := 'ABC_PAR_VALUE';

  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name     =>  'dbzxout',
    table_names     =>  tables,
    schema_names    =>  schemas);
END;
Enter fullscreen mode Exit fullscreen mode

Alright that is it for oracle part. Now lets download the required software.

Now create a directory that will contain all the work we'll do here; BTW you can extract above downloaded files anywhere you like, but for simpliciy we will put them all under stream-demo folder.

mkdir stream-demo

  • Extract JDK under stream-demo/jdk 
  • Extract Confuent to stream-demo/confluent
  • Extract Instant Clinet to stream-demo/instant-client
  • Create a directory stream-demo/confluent/share/java/kafka-connect-debezium
  • Place the following jars under stream-demo/confluent/share/java/kafka-connect-debezium

    • 4 Runtime
    • Debezium Connector For Oracle
    • Debezium Core
    • Debezium ANTLR DDL Parsers
    • Oracle JDBC Driver
  • Also copy xstreams.jar from stream-demo/instant-client to stream-demo/confluent/share/java/kafka-connect-debezium

  • Add LD_LIBRARY_PATH and JAVA_HOME to your .profile like this;

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/full/path/to/stream-demo/instant-client
export JAVA_HOME=/full/path/to/stream-demo/jdk
Enter fullscreen mode Exit fullscreen mode
  • Save and quit then execute; source ~/.profile

  • Go to stream-demo/confluent/bin and execute ./confluent start

Ok we are almost done;
Open Insomnia rest client and create a work-space.
Add a new post request with the following payload

{
 "name": "gds",
 "config": {
  "connector.class" : "io.debezium.connector.oracle.OracleConnector",
  "tasks.max" : "1",
  "database.server.name" : "ORCL",
  "database.hostname" : "localhost",
  "database.port" : "1530",
  "database.user" : "xstrmadm",
  "database.password" : "oracle",
  "database.dbname" : "ORCL",
  "database.tablename.case.insensitive": false,
  "database.oracle.version": "11", "table.whitelist":"ORCL.MY_SCHEMA.ABC_INPUT_PARSE,ORCL.MY_SCHEMA.ABC_PAR_VALUE,ORCL.MY_SCHEMA.ABC_CUSTOMER_PROPOSAL",
  "database.out.server.name" : "dbzxout",
  "database.history.kafka.bootstrap.servers" : "localhost:9092",
  "database.history.kafka.topic": "schema-changes.inventory",
  "decimal.handling.mode": "precise"
 }
}
Enter fullscreen mode Exit fullscreen mode

Change the above parameters according to your installation.

The URL for the post request is;
http://localhost:8083/connectors

  • Send the post request.

  • Send a get request to http://localhost:8083/connectors/gds/status

You should see something like;

{
  "name": "gds",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.1.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    }
  ],
  "type": "source"
}
Enter fullscreen mode Exit fullscreen mode

Ok now open ksql client;

cd stream-demo/confluent/bin
./ksql
Enter fullscreen mode Exit fullscreen mode

If your tables are receiving some some dml already, you should see the topics by executing;

LIST TOPICS;

If you do not see anything try to insert, update or delete some records to your tables and then exec LIST TOPICS; again.

We have integrated oracle and kafka. Rest of the work is relatively easier. If we look our example architecture there is a python layer in which we read kafka topics with confluent python client and create streams from the topics; An example stream is created like this;

import logging

from streams.ksql_client import KSQLClient
from admin import ConfluentAdmin

class CustomerMaxSysId(KSQLClient, ConfluentAdmin):
    def __init__(self, *args, **kwargs):
        self.logger = logging.getLogger('streams.gr_large_none')
        super().__init__(*args, **kwargs)

    def create(self):
        name = 'S_CUSTOMER_MAX_SYS_ID'
        self.logger.info(f"Creating stream {name}")

        if self.is_stream_exists(name):
            self.logger.info(f"Stream already exists: {name}")
            return

        q = f"""
            create stream {name} (
                process_time    bigint,
                customer_no     varchar,
                sys_id          varchar,
                proposal_no     varchar,
                customer_type   varchar,
                flow_id         integer
            )
            WITH (kafka_topic='CUSTOMER_MAX_SYS_ID', value_format='json', timestamp='process_time')
        """
        self.ksql_client.ksql(q)
        self.logger.info(f"Created stream {name}")
Enter fullscreen mode Exit fullscreen mode

We also use python ksql client to interact with ksql-rest api. You can also use rest api directly but using the ksql client api makes it more expressive.

Without getting into full detail in this layer we create streams both from kafka topics and by joining streams. An example stream-stream join can be like this;

import logging

from streams.ksql_client import KSQLClient
from admin import ConfluentAdmin

class GeneralLimitOutZero(KSQLClient, ConfluentAdmin):
    def __init__(self, *args, **kwargs):
        self.logger = logging.getLogger('streams.gr_large_none')
        super().__init__(*args, **kwargs)

    def create(self):
        name = 'S_GENERAL_LIMIT_OUT_ZERO'
        self.logger.info(f"Creating stream {name}")

        if self.is_stream_exists(name):
            self.logger.info(f"Stream already exists: {name}")
            return

        q = f"""
            create stream {name}
            WITH (kafka_topic='{name}', value_format='JSON')
            as
            select
                s.customer_no,
                s.sys_id,
                s.proposal_id,
                o.out_value
            from
                S_GR_LARGE_MODEL_ACTION s
            inner join S_OUTPUT_PARSE o within 15 days
                on s.sys_id = o.ref_no
            where
                s.out_value  = '0' and
                o.out_param  = 'LIMIT'
        """

        self.ksql_client.ksql(q)
        self.logger.info(f"Created stream {name}")
Enter fullscreen mode Exit fullscreen mode

This layer is like an ETL process in a standard DWH except we use data streams instead of static data.

Next after creating streams, we can listen for data and take necessary actions or just report them in a reporting layer.

In our scenario we store the received data in redis. We use redis python client. And also we publish our received events to a redis channel.

        key = f"gds:alert:gr_large_none:{customer_no}:{sys_id}:{proposal_id}"
        self.__set_ttl(key)
        self.logger.debug(f'Setting key {key}')
        self.connection.hmset(key, record)
        channel = 'channel:gds:alert:gr_large_none'
        self.logger.debug(f'Publishing key to {channel}')
        self.connection.publish(channel, key)

Enter fullscreen mode Exit fullscreen mode

You can publish events to a kafka topic instead of redis channel. We have chosen redis channel here so the next layers only interacts with redis instead of both redis and kafka.

The next and the final part is listening redis channels and producing actions; In this layer we have used go and before you ask there is no special purpose of choosing go over python, we have just wanted to demonstrate a heterogeneous, flexible environment with different technologies.

func Run() {

    logrus.Debug("Attaching redis channels ...")
    defer client.Close()
    pubsub := client.Subscribe(channelNames()...)
    defer pubsub.Close()
    _, err := pubsub.Receive()
    if err != nil {
        panic(err)
    }
    ch := pubsub.Channel()

    var anomalies = collectAnomalies()

    for channel, keys := range anomalies {
        if len(keys) >= anomalyLimit(channel) {
            notify(channel, keys)
            anomalies[channel] = []string{}
        }
    }

    for message := range ch {
        ca := channelAlias(message.Channel)
        key := message.Payload
        logrus.Debug(ca, " : Received message ", key)
        keys, ok := anomalies[message.Payload]
        if !ok {
            keys = []string{key}
            anomalies[message.Payload] = keys
        }

        anomalies[message.Channel] = util.AppendUniqueStr(anomalies[message.Channel], key)
        logrus.Debug(ca, " : Number of anomalies ", len(anomalies[message.Channel]))
        if len(anomalies[message.Channel]) >= anomalyLimit(message.Channel) {
            notify(message.Channel, anomalies[message.Channel])
            anomalies[message.Channel] = []string{}
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Conclusion

The basic part of this scenario is actually integrating oracle and kafka using debezium. The later steps can be implemented in many different ways depending the case.

Thanks for reading.

Top comments (0)