DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

📼 ksqlDB HOWTO - A mini video series 📼

Some people learn through doing - and for that there’s a bunch of good ksqlDB tutorials here and here. Others may prefer to watch and listen first, before getting hands on. And for that, I humbly offer you this little series of videos all about ksqlDB. They’re all based on a set of demo scripts that you can run for yourself and try out.

🚨 Make sure you subscribe to my YouTube channel so that you don’t miss more videos like these!

S01E01 : Filtering

Using ksqlDB you can filter streams of data in Apache Kafka and write new topics in Kafka populated by a subset of another. For example

CREATE STREAM ORDERS_NY AS
  SELECT *
    FROM ORDERS
   WHERE ADDRESS_STATE='New York';
Enter fullscreen mode Exit fullscreen mode

S01E02 : Schema Manipulation

There are lots of transformations that you can do on streams in ksqlDB including:

  • Remove/drop fields

  • Rename fields

  • CAST datatypes

  • Reformat timestamps from BIGINT epoch to human-readable strings

  • Flatten nested objects (STRUCT)

For example:

CREATE STREAM ORDERS_PROCESSED AS
  SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS ORDER_TS,
         CAST(ORDER_VALUE AS DECIMAL(9,2)),
         ORDER->ADDRESS AS ADDRESS
    FROM ORDERS;
Enter fullscreen mode Exit fullscreen mode

S01E03 : Joins

Using ksqlDB you can enrich messages on a Kafka topic with reference data held in another topic. This could come from a database, message queue, producer API, etc. With JOIN clause you can define relationships between streams and/or tables in ksqlDB (which are built on topics in Kafka). For example:

CREATE STREAM ORDERS_ENRICHED AS
SELECT O.ORDERTIME AS ORDER_TIMESTAMP,
       O.ORDERID,
       I.MAKE,
       O.ORDERUNITS,
       O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE,
  FROM ORDERS O
       INNER JOIN ITEM_REFERENCE_01 I
       ON O.ITEMID = I.ITEM_ID
  PARTITION BY ORDERID;
Enter fullscreen mode Exit fullscreen mode

S01E04 : Integration with other systems

ksqlDB can pull data in from other systems (e.g. databases, JMS message queues, etc etc), and push data down to other systems (NoSQL stores, Elasticsearch, databases, Neo4j, etc etc). This is done using Kafka Connect, which can be run embedded within ksqlDB or as a separate cluster of workers. ksqlDB can be used to create and control the connectors. For example:

CREATE SINK CONNECTOR SINK_ELASTIC_ORDERS_01 WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'topics' = 'ORDERS_ENRICHED',
  'connection.url' = 'http://elasticsearch:9200',
  'type.name' = '_doc'
  );
Enter fullscreen mode Exit fullscreen mode

S01E05 : Reserialising Streams

Using ksqlDB you can reserialise data in Apache Kafka topics. For example, you can take a stream of CSV data and write it to a new topic in Avro. ksqlDB supports many serialisation formats including Avro, Protobuf, JSON Schema, JSON, and Delimited (CSV, TSV, etc). For example:

CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT='DELIMITED',
                                KAFKA_TOPIC='orders_csv') AS
  SELECT * FROM ORDERS;
Enter fullscreen mode Exit fullscreen mode

S01E06 : Split and Merge Kafka Topics

You can split streams of data in Apache Kafka based on values in a field using ksqlDB. You can also merge separate streams of data together into one.

  • Splitting a stream:

    CREATE STREAM ORDERS_UK
      AS SELECT * FROM ORDERS WHERE COUNTRY='UK';
    
    CREATE STREAM ORDERS_OTHER
      AS SELECT * FROM ORDERS WHERE COUNTRY!='UK';
    
  • Merging streams

    CREATE STREAM INVENTORY_COMBINED
      AS SELECT 'WH1' AS SOURCE, * FROM INVENTORY_WH1;
    
    CREATE STREAM INVENTORY_COMBINED
      AS SELECT 'WH2' AS SOURCE, * FROM INVENTORY_WH2;
    

S01E07 : Aggregates

Using ksqlDB you can build stateful aggregations of state on events in Apache Kafka topics. These are persisted as Kafka topics and held in a state store within ksqlDB that you can query directly or from an external application using the Java client or REST API.

ksqlDB uses SQL to describe the stream processing that you want to do. For example:

CREATE TABLE ORDERS_BY_MAKE AS
  SELECT MAKE,
         COUNT(*) AS ORDER_COUNT,
         SUM(TOTAL_ORDER_VALUE) AS TOTAL_ORDER_VALUE
    FROM ORDERS
   GROUP BY MAKE;
Enter fullscreen mode Exit fullscreen mode

S01E08: Time Handling

When you do processing in ksqlDB that is based on time (such as windowed aggregations, or stream-stream joins) it is important that you define correctly the timestamp by which you want your data to be processed. This could be the timestamp that’s part of the Kafka message metadata, or it could be a field in the value of the Kafka message itself.

By default ksqlDB will use the timestamp of the Kafka message. You can change this by specifying WITH (TIMESTAMP='…' in your CREATE STREAM statement, and instead identify a value field to use as the timestamp.

Use the ROWTIME system field to view the timestamp of the ksqlDB row.

References & Links

Questions?

👉 Head over to the Confluent Community forum or Slack group.

Top comments (0)