DEV Community

Hans-Peter Grahsl
Hans-Peter Grahsl

Posted on

How to build a streaming 🀩 emojis 😍 tracker app with πŸš€ ksqlDB πŸš€

I recently created and published a tiny but handy project which provides a few custom user-defined functions (UDFs) to work with emojis in ksqlDB. While there is a minimum viable amount of project documentation, it's often much easier based on a little application to quickly understand how to make use of these functions and what you can build with them.

The goal is to build a simple, yet fault-tolerant and scalable stream processing backend to do near real-time tracking of emojis based on public tweets. Such an application can be used to feed a live web dashboard which, for instance, might show a continuously updated ranking of the most popular emojis derived from extracting and aggregating emoji occurrences found in tweets. It could look like the illustration below:

Streaming Emoji Tracker Dashboard

Thanks to ksqlDB, we can build all vital components for such an application in one coherent technology stack. Its unified streaming architecture:

  • allows configuration-based data ingress and egress based on Apache Kafka Connect
  • enables transformations, aggregations, and enrichments of data with a convenient SQL-like language
  • and supports both, streaming queries as well as point-in-time lookups against the managed state based on materialized views

Let's get this started step-by-step...

Step 1: Data Ingestion

Directly from within ksqlDB, we can manage Apache Kafka Connect source and sink connectors. For this example, we leverage a turn-key ready Twitter source connector from the community. Before you can deploy this connector you have to get your access credentials for the Twitter API which need to be configured for the source connector. The following ksql snippet - source connector configuration in the WITH clause - does the job and brings in public live tweets that match certain keywords:

CREATE SOURCE CONNECTOR `my-twitter-src-01` WITH (
    'connector.class'='com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector',
    'value.converter'='org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable'=false,
    'key.converter'='org.apache.kafka.connect.json.JsonConverter',
    'key.converter.schemas.enable'=false,
    'twitter.oauth.accessToken'='...',
    'twitter.oauth.consumerSecret'='...',
    'twitter.oauth.consumerKey'='...',
    'twitter.oauth.accessTokenSecret'='...',
    'kafka.status.topic'='tweets',
    'process.deletes'=false,
    'filter.keywords'='coronavirus,2019nCoV,SARSCoV2,covid19,cov19'
);

NOTE: You have to replace all 4 twitter.oauth.* settings with your access credentials for this to work properly.

Shortly after the source connector starts it will produce JSON records into the configured Apache Kafka topic named tweets. To quickly inspect the data ingestion we define our base stream reduced to the essence of the original payload like so:

-- 'create stream from tweets topic only using a few relevant payload fields'
CREATE STREAM tweets
    (ID BIGINT, CREATEDAT BIGINT, TEXT VARCHAR,LANG VARCHAR,USER STRUCT<SCREENNAME VARCHAR>)
WITH (kafka_topic='tweets',value_format='JSON');

To inspect the output of this stream for a single ingested tweet we SELECT just one record:

ksql> SELECT ID,TEXT FROM tweets EMIT CHANGES LIMIT 1;
+-------------------------------------+---------------------------------------------+
|ID                                   |TEXT                                         |
+-------------------------------------+---------------------------------------------+
|1244971755531354112                  |RT @Faytuks: #BREAKING🚨 - Sweden has confirm|
|                                     |ed 34 new #coronavirus deaths, raising the co|
|                                     |untry's death toll to 180 with 4 028 confirme|
|                                     |d cas…                                       |
Limit Reached
Query terminated

Step 2: Data Processing

Now, for the actual stream processing on top of the ingested data, we first create a derived stream tweets_emojis by applying our custom UDF EXTRACT_EMOJIS. You can find a short description how to install these custom emoji handling functions into your ksql-server installation here.

-- 'create derived stream using the EMOJIS_EXTRACT function on the TEXT column'
CREATE STREAM tweets_emojis AS 
    SELECT EMOJIS_EXTRACT(TEXT,false) AS EMOJIS FROM tweets EMIT CHANGES;

This UDF extracts an array of potentially contained tweets from the raw TEXT column. The 2nd parameter of type boolean allows us to choose between list (unique=false) or set semantic (unique=true) which decides whether or not the extracted array may contain duplicate emoji entries.

The next step is to flatten the emojis array which we can easily achieve by applying ksqlDB's built-in UDTF called EXPLODE. Doing so will automatically skip any tweet which didn't contain emojis at all - those which are represented with an empty array in the above tweets_emojis stream.

-- 'create flattened stream to get each contained emoji of a tweet in a separate record'
CREATE STREAM tweets_emojis_flattened AS 
    SELECT EXPLODE(EMOJIS) AS EMOJI FROM tweets_emojis EMIT CHANGES;

The resulting stream contains all occurring emojis as separate records:

ksql> SELECT ROWKEY,EMOJI FROM tweets_emojis_flattened EMIT CHANGES LIMIT 10;
+--------------------------------+--------------------------------+
|ROWKEY                          |EMOJI                           |
+--------------------------------+--------------------------------+
|{"Id":1244971755531354112}      |🚨                              |
|{"Id":1244971755996880897}      |😒                              |
|{"Id":1244971756122550272}      |πŸ’‘                              |
|{"Id":1244971757011902469}      |πŸ‘‡πŸ»                              |
|{"Id":1244971760203845632}      |πŸ“½                              |
|{"Id":1244971760203845632}      |☣                              |
|{"Id":1244971760203845632}      |πŸ‡·πŸ‡Ί                              |
|{"Id":1244971760203845632}      |πŸ›‘                              |
|{"Id":1244971760639827968}      |❀                               |
|{"Id":1244971760287731718}      |🀣                              |
Limit Reached
Query terminated

This stream serves as input to create a table by doing a simple aggregation to group and count the emojis:

-- 'create a table which continuously calculates the total count of each specific emoji'
CREATE TABLE tweets_emoji_counts AS 
   SELECT EMOJI,COUNT(*) AS TOTAL_COUNT FROM tweets_emojis_flattened
      GROUP BY EMOJI EMIT CHANGES;

Step 3: Queries

ksqlDB provides two different types of queries against tables:

Pull Queries

Pull queries can be used to do point-in-time lookups against the continuously updated state which is represented by the table. We can retrieve the current total counter of any specific emoji by specifying it as the ROWKEY in the WHERE clause:

-- 'pull query to retrieve single point-in-time emoji count'
SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts WHERE ROWKEY = 'πŸ™';
ksql> SELECT EMOJI,TOTAL_COUNT FROM
tweets_emji_counts WHERE ROWKEY = 'πŸ™';
+--------------------+--------------------+
|EMOJI               |TOTAL_COUNT         |
+--------------------+--------------------+
|πŸ™                  |20                  |
Query terminated
Push Queries

Push queries are meant to run indefinitely and emit results every time there are changes in the underlying data stream or updates of the table state respectively. We can subscribe to a continuously updated total counter stream of a single, several specific or all emojis:

-- 'PUSH query: get change stream for single emoji counter'
SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts
   WHERE emoji = 'πŸ™' EMIT CHANGES;
ksql> SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts
WHERE emoji = 'πŸ™' EMIT CHANGES;
+---------------------+---------------------+
|EMOJI                |TOTAL_COUNT          |
+---------------------+---------------------+
|πŸ™                   |55                   |
|πŸ™                   |56                   |
|πŸ™                   |59                   |
|πŸ™                   |61                   |
^CQuery terminated
-- 'PUSH query: change stream for several emoji counters - there is no IN(...) yet :('
SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts
   WHERE emoji = 'πŸ™' OR emoji = '🚨' OR emoji = '😭' EMIT CHANGES;
ksql> SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts 
WHERE emoji = 'πŸ™' OR emoji = '🚨' OR emoji = '😭' EMIT CHANGES;
+---------------------+---------------------+
|EMOJI                |TOTAL_COUNT          |
+---------------------+---------------------+
|😭                   |48                   |
|πŸ™                   |66                   |
|🚨                   |109                  |
|😭                   |49                   |
|🚨                   |111                  |
|😭                   |50                   |
|🚨                   |112                  |
|πŸ™                   |67                   |
^CQuery terminated

Step 4: Serving Data to Clients

Thanks to its HTTP / REST API, ksqlDB lets us directly expose both, push and pull queries to consuming clients. For instance, we can run the needed queries to build a live dashboard in which we visualize the data streams and offer end-users a simple form-based query mechanism. Below is a cURL snippet, which when run against the REST API, is showing the output of a PULL query to retrieve the current counter for the πŸ™ emoji:

curl -s -X "POST" "http://localhost:8088/query" \
  -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
  -d $'{
   "ksql": "SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts WHERE ROWKEY = \'πŸ™\';",
    "streamsProperties": {}
  }' | jq '.[1] | {emoji: .row.columns[0],count: .row.columns[1]}'

which for my particular data results in:

{
  "emoji": "πŸ™",
  "count": 2198
}

Have fun handling Emojis in πŸš€ ksqlDB πŸš€

Top comments (0)