DEV Community

Akmal Chaudhri for SingleStore

Posted on • Updated on

AdTech using SingleStoreDB, Kafka and Metabase

Abstract

SingleStoreDB can ingest and process large quantities of streaming data with ease. In this article, we'll build an AdTech application that simulates millions of Ad Campaign events streaming from a Kafka broker into SingleStoreDB. We'll also use Metabase to build a dashboard to visualise the data in SingleStoreDB.

Introduction

In a previous article series, we've seen how to use SingleStoreDB with Kafka hosted on the Confluent Cloud. The example simulated global sensors sending temperature readings that could be stored and analysed in SingleStoreDB. In this article, we'll use a more localised Kafka broker and create a visual dashboard to provide insights into and analysis of the streamed data.

The complete SQL code is listed in the Appendix.

Create a SingleStoreDB Cloud account

A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use AdTech Demo Group as our Workspace Group Name and adtech-demo as our Workspace Name.

Once we've created our database in the following steps, we'll make a note of our password and host name.

Create a Database and Tables

In our SingleStoreDB Cloud account, we'll use the SQL Editor to create a new database, as follows:

CREATE DATABASE IF NOT EXISTS adtech;

USE adtech;
Enter fullscreen mode Exit fullscreen mode

We'll also create two tables. First, the events table:

CREATE TABLE events (
    user_id INT,
    event_name VARCHAR(128),
    advertiser VARCHAR(128),
    campaign INT(11),
    gender VARCHAR(128),
    income VARCHAR(128),
    page_url VARCHAR(128),
    region VARCHAR(128),
    country VARCHAR(128),
    SORT KEY adtmidx (user_id, event_name, advertiser, campaign),
    SHARD KEY user_id (user_id)
);
Enter fullscreen mode Exit fullscreen mode

This table stores details of the advertiser, campaign and various demographic information about the user, such as gender and income.

Second, the campaigns table:

CREATE REFERENCE TABLE campaigns (
    campaign_id SMALLINT(6) NOT NULL DEFAULT '0',
    campaign_name VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
    PRIMARY KEY (campaign_id)
);
Enter fullscreen mode Exit fullscreen mode

We have a small number of campaigns and using REFERENCE will keep the table in memory and provide very fast access.

Next, we'll insert the details of the campaigns:

INSERT INTO campaigns VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');
Enter fullscreen mode Exit fullscreen mode

Create a Pipeline

Pipelines allow us to create streaming ingest feeds from various sources, such as Kafka, S3 and HDFS, using a single command. With pipelines, we can also perform ETL operations.

For our use case, we can create a simple pipeline in SingleStoreDB as follows:

CREATE PIPELINE events
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'
BATCH_INTERVAL 2500
INTO TABLE events
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES STARTING BY ''
(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);
Enter fullscreen mode Exit fullscreen mode

We can control the rate of data ingestion using the BATCH_INTERVAL. Initially, we'll set this to 2500 milliseconds.

We can configure the pipeline to start from the earliest offset, as follows:

ALTER PIPELINE events SET OFFSETS EARLIEST;
Enter fullscreen mode Exit fullscreen mode

and we can test the pipeline before we start running it, as follows:

TEST PIPELINE events LIMIT 1;
Enter fullscreen mode Exit fullscreen mode

The pipeline test should result in an error, as the value for the campaign column is out of range:

ERROR 1264 ER_WARN_DATA_OUT_OF_RANGE: ... Out of range value for column 'campaign'
Enter fullscreen mode Exit fullscreen mode

Let's look at the data in the ad_events topic from the Kafka broker and see if we can identify the problem. We'll install kcat (formerly kafkacat):

sudo apt install kafkacat
Enter fullscreen mode Exit fullscreen mode

and then run it as follows:

kafkacat -C -b public-kafka.memcompute.com:9092 -t ad_events
Enter fullscreen mode Exit fullscreen mode

Here are 10 rows of example output:

...
607982731 Downstream Conversion Verizon Wireless 5 9 Male 50k - 75k / Utah US
607982732 Impression AT&T Wireless 4 3 Female unknown /2014/04/make-spring-centerpiece-with-tulips.html Florida US
607982732 Click AT&T Wireless 4 3 Female unknown /2014/04/make-spring-centerpiece-with-tulips.html Florida US
607982732 Impression AT&T Wireless 5 9 Female unknown /category/31-day-drawing-challenge/ Florida US
607982732 Impression AT&T Wireless 5 7 Female unknown /2016/05/lars-book-club-may.html/ Florida US
607982732 Impression AT&T Wireless 5 7 Female unknown /2013/01/polka-dot-your-wrapping.html/ Florida US
607982732 Impression AT&T Wireless 13 3 Female unknown /2016/03/12-best-paper-flowers-volume-3.html/ Florida US
607982733 Impression Dominos Pizza 5 6 Female 25k and below /2014/07/make-the-midsummer-floral-crown.html/ Pennsylvania US
607982734 Impression Lowes Companies 5 13 Female unknown /2016/01/eyeball-drink-stirrers.html/ New York US
607982735 Impression Lowes Companies 4 6 Male unknown /2016/02/review-of-the-5-best-planners-of-2016.html/2/ California US
...
Enter fullscreen mode Exit fullscreen mode

If we take the first row above, and check for hidden characters, we'll see that we have the tab character (\t) separating the fields:

607982731<tab>Downstream<space>Conversion<tab>Verizon<space>Wireless<tab>5<space>9<tab>Male<tab>50k<space>-<space>75k<tab>/<tab>Utah<tab>US
Enter fullscreen mode Exit fullscreen mode

Mapping the fields back to the table schema, we'll see that the campaign column is meant to be an integer, but the stream contains two integers separated by a space:

... <tab>integer<space>integer<tab> ...
Enter fullscreen mode Exit fullscreen mode

We can change the pipeline slightly to deal with this problem. First, we'll drop the pipeline:

DROP PIPELINE events;
Enter fullscreen mode Exit fullscreen mode

Then we'll modify line 4 of the pipeline with the keyword IGNORE, as follows:

CREATE PIPELINE events
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'
BATCH_INTERVAL 2500
IGNORE INTO TABLE events
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES STARTING BY ''
(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);
Enter fullscreen mode Exit fullscreen mode

We can configure the pipeline again to start from the earliest offset, as follows:

ALTER PIPELINE events SET OFFSETS EARLIEST;
Enter fullscreen mode Exit fullscreen mode

and we can test the pipeline again before we start running it, as follows:

TEST PIPELINE events LIMIT 1;
Enter fullscreen mode Exit fullscreen mode

This time the pipeline test should be successful and we should see some output, similar to the following:

+-----------+------------+------------+----------+---------+---------+----------------------------------+--------+---------+
| user_id   | event_name | advertiser | campaign | gender  | income  | page_url                         | region | country |
+-----------+------------+------------+----------+---------+---------+----------------------------------+--------+---------+
| 605804911 | Impression | Aldi       |       13 | unknown | unknown | /2016/05/day-9-draw-a-rose.html/ | Nevada | US      |
+-----------+------------+------------+----------+---------+---------+----------------------------------+--------+---------+
Enter fullscreen mode Exit fullscreen mode

We can now start the pipeline:

START PIPELINE events;
Enter fullscreen mode Exit fullscreen mode

Install Metabase

To use Metabase, we'll follow the JAR file installation instructions and run the JAR file, as follows:

java -jar metabase.jar
Enter fullscreen mode Exit fullscreen mode

Next, we'll open the following webpage:

http://localhost:3000/setup
Enter fullscreen mode Exit fullscreen mode

We should see the following, shown in Figure 1.

Figure 1. Metabase setup.

Figure 1. Metabase setup.

We'll work through the various wizards to complete the language and user settings. To add our data, we'll select MySQL and we'll need to fill in the details for our connection, as follows:

  • Display name: S2
  • Host: <host>
  • Port: 3306
  • Database name: adtech
  • Username: admin
  • Password: <password>

We'll replace the <host> and <password> with the values from our SingleStoreDB Cloud account.

We'll click the Connect database button. The connection should be successful.

Finally, we'll click Finish and then Take me to Metabase.

Next, we should see the following, as shown in Figure 2.

Figure 2. Metabase Home.

Figure 2. Metabase Home.

Create Dashboard

We are now ready to create our dashboard.

1. Total Number of Events

From the top right, we'll select + New and then SQL Query as shown in Figure 3.

Figure 3. New SQL Query.

Figure 3. New SQL Query.

After selecting the S2 database, we'll enter the following into the SQL Editor in Metabase:

SELECT COUNT(*) FROM events;
Enter fullscreen mode Exit fullscreen mode

We can run the query using the right arrow button (▶) in the bottom right and use the Save link in the top right to save this query. We'll call the query Total Number of Events. We'll create a new dashboard called AdTech and add the query to the dashboard, as shown in Figure 4.

Figure 4. AdTech Dashboard.

Figure 4. AdTech Dashboard.

2. Events by Region

We'll repeat the previous process of creating an SQL Query and adding it to the AdTech dashboard for the following query (Events by Region):

SELECT events.country AS `events.country`, COUNT(events.country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;
Enter fullscreen mode Exit fullscreen mode

With this query, we have the option of creating a Visualization. We'll create a pie chart.

3. Events by Top 5 Advertisers

We'll repeat the previous process of creating an SQL Query and adding it to the AdTech dashboard for the following query (Events by Top 5 Advertisers):

SELECT events.advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (events.advertiser LIKE '%Subway%' OR events.advertiser LIKE '%McDonals%' OR events.advertiser LIKE '%Starbucks%' OR events.advertiser LIKE '%Dollar General%' OR events.advertiser LIKE '%YUM! Brands%' OR events.advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;
Enter fullscreen mode Exit fullscreen mode

With this query, we have the option of creating a Visualization. We'll create a bar chart.

4. Ad Visitors by Gender and Income

We'll repeat the previous process of creating an SQL Query and adding it to the AdTech dashboard for the following query (Ad Visitors by Gender and Income):

SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
        WHEN xx.z___min_rank = xx.z___rank THEN 1
        ELSE 0
      END AS z__is_highest_ranked_cell
    FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
        FROM (SELECT *, RANK() OVER (ORDER BY CASE
                WHEN bb.z__pivot_col_rank = 1 THEN (CASE
                    WHEN bb.`events.count` IS NOT NULL THEN 0
                    ELSE 1
                  END)
                ELSE 2
              END, CASE
                WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
                ELSE NULL
              END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
            FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
                    WHEN ww.`events.gender` IS NULL THEN 1
                    ELSE 0
                  END, ww.`events.gender`) AS z__pivot_col_rank
                FROM (SELECT events.gender AS `events.gender`, events.income AS `events.income`, COUNT(*) AS `events.count`
                    FROM adtech.events AS events
                    WHERE (events.income <> 'unknown' OR events.income IS NULL)
                    GROUP BY 1, 2) ww) bb
            WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;
Enter fullscreen mode Exit fullscreen mode

With this query, we have the option of creating a Visualization. We'll create a table.

Final Dashboard

Figure 5 shows an example of the charts being sized and positioned on the AdTech dashboard. We'll set the auto-refresh option to 1 minute.

Figure 5. Final Dashboard.

Figure 5. Final Dashboard.

Summary

In this article, we have created a pipeline in SingleStoreDB to connect to a Kafka broker. We have identified a problem with the incoming data and modified our pipeline to handle the issue. We have also used Metabase to create a quick visual dashboard to help us gain insights into our Ad Campaign. For additional details, please see the SingleStore documentation on how to Load and Analyze AdTech Data.

Appendix

CREATE DATABASE IF NOT EXISTS adtech;

USE adtech;

CREATE TABLE events (
    user_id INT,
    event_name VARCHAR(128),
    advertiser VARCHAR(128),
    campaign INT(11),
    gender VARCHAR(128),
    income VARCHAR(128),
    page_url VARCHAR(128),
    region VARCHAR(128),
    country VARCHAR(128),
    SORT KEY adtmidx (user_id, event_name, advertiser, campaign),
    SHARD KEY user_id (user_id)
);

CREATE REFERENCE TABLE campaigns (
    campaign_id SMALLINT(6) NOT NULL DEFAULT '0',
    campaign_name VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
    PRIMARY KEY (campaign_id)
);

INSERT INTO campaigns VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');

CREATE PIPELINE events
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'
BATCH_INTERVAL 2500
INTO TABLE events
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES STARTING BY ''
(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);

ALTER PIPELINE events SET OFFSETS EARLIEST;

TEST PIPELINE events LIMIT 1;

DROP PIPELINE events;

CREATE PIPELINE events
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'
BATCH_INTERVAL 2500
IGNORE INTO TABLE events
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES STARTING BY ''
(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);

ALTER PIPELINE events SET OFFSETS EARLIEST;

TEST PIPELINE events LIMIT 1;

START PIPELINE events;

-- Total Number of Events
SELECT COUNT(*) FROM events;

-- Events by Region
SELECT events.country AS `events.country`, COUNT(events.country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;

-- Events by Top 5 Advertisers
SELECT events.advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (events.advertiser LIKE '%Subway%' OR events.advertiser LIKE '%McDonals%' OR events.advertiser LIKE '%Starbucks%' OR events.advertiser LIKE '%Dollar General%' OR events.advertiser LIKE '%YUM! Brands%' OR events.advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;

-- Ad Visitors by Gender and Income
SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
        WHEN xx.z___min_rank = xx.z___rank THEN 1
        ELSE 0
      END AS z__is_highest_ranked_cell
    FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
        FROM (SELECT *, RANK() OVER (ORDER BY CASE
                WHEN bb.z__pivot_col_rank = 1 THEN (CASE
                    WHEN bb.`events.count` IS NOT NULL THEN 0
                    ELSE 1
                  END)
                ELSE 2
              END, CASE
                WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
                ELSE NULL
              END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
            FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
                    WHEN ww.`events.gender` IS NULL THEN 1
                    ELSE 0
                  END, ww.`events.gender`) AS z__pivot_col_rank
                FROM (SELECT events.gender AS `events.gender`, events.income AS `events.income`, COUNT(*) AS `events.count`
                    FROM adtech.events AS events
                    WHERE (events.income <> 'unknown' OR events.income IS NULL)
                    GROUP BY 1, 2) ww) bb
            WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;
Enter fullscreen mode Exit fullscreen mode

Top comments (0)