Abstract
SingleStoreDB can ingest and process large quantities of streaming data with ease. In this article, we'll build an AdTech application that simulates millions of Ad Campaign events streaming from a Kafka broker into SingleStoreDB. We'll also use Metabase to build a dashboard to visualise the data in SingleStoreDB.
Introduction
In a previous article series, we've seen how to use SingleStoreDB with Kafka hosted on the Confluent Cloud. The example simulated global sensors sending temperature readings that could be stored and analysed in SingleStoreDB. In this article, we'll use a more localised Kafka broker and create a visual dashboard to provide insights into and analysis of the streamed data.
The complete SQL code is listed in the Appendix.
Create a SingleStoreDB Cloud account
A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use AdTech Demo Group as our Workspace Group Name and adtech-demo as our Workspace Name.
Once we've created our database in the following steps, we'll make a note of our password and host name.
Create a Database and Tables
In our SingleStoreDB Cloud account, we'll use the SQL Editor to create a new database, as follows:
CREATE DATABASE IF NOT EXISTS adtech;
We'll also create two tables. First, the events
table:
USE adtech;
CREATE TABLE events (
user_id INT,
event_name VARCHAR(128),
advertiser VARCHAR(128),
campaign INT(11),
gender VARCHAR(128),
income VARCHAR(128),
page_url VARCHAR(128),
region VARCHAR(128),
country VARCHAR(128),
SORT KEY adtmidx (user_id, event_name, advertiser, campaign),
SHARD KEY user_id (user_id)
);
This table stores details of the advertiser
, campaign
and various demographic information about the user, such as gender
and income
.
Second, the campaigns
table:
CREATE REFERENCE TABLE campaigns (
campaign_id SMALLINT(6) NOT NULL DEFAULT '0',
campaign_name VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
PRIMARY KEY (campaign_id)
);
We have a small number of campaigns and using REFERENCE
will keep the table in memory and provide very fast access.
Next, we'll insert the details of the campaigns:
INSERT INTO campaigns VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');
Create a Pipeline
Pipelines allow us to create streaming ingest feeds from various sources, such as Kafka, S3 and HDFS, using a single command. With pipelines, we can also perform ETL operations.
For our use case, we can create a simple pipeline in SingleStoreDB as follows:
CREATE PIPELINE events
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'
BATCH_INTERVAL 2500
INTO TABLE events
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES STARTING BY ''
(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);
We can control the rate of data ingestion using the BATCH_INTERVAL
. Initially, we'll set this to 2500 milliseconds.
We can configure the pipeline to start from the earliest offset, as follows:
ALTER PIPELINE events SET OFFSETS EARLIEST;
and we can test the pipeline before we start running it, as follows:
TEST PIPELINE events LIMIT 1;
The pipeline test should result in an error, as the value for the campaign
column is out of range:
ERROR 1264 ER_WARN_DATA_OUT_OF_RANGE: ... Out of range value for column 'campaign'
Let's look at the data in the ad_events
topic from the Kafka broker and see if we can identify the problem. We'll install kcat (formerly kafkacat):
sudo apt install kafkacat
and then run it as follows:
kafkacat -C -b public-kafka.memcompute.com:9092 -t ad_events
Here are 10 rows of example output:
...
607982731 Downstream Conversion Verizon Wireless 5 9 Male 50k - 75k / Utah US
607982732 Impression AT&T Wireless 4 3 Female unknown /2014/04/make-spring-centerpiece-with-tulips.html Florida US
607982732 Click AT&T Wireless 4 3 Female unknown /2014/04/make-spring-centerpiece-with-tulips.html Florida US
607982732 Impression AT&T Wireless 5 9 Female unknown /category/31-day-drawing-challenge/ Florida US
607982732 Impression AT&T Wireless 5 7 Female unknown /2016/05/lars-book-club-may.html/ Florida US
607982732 Impression AT&T Wireless 5 7 Female unknown /2013/01/polka-dot-your-wrapping.html/ Florida US
607982732 Impression AT&T Wireless 13 3 Female unknown /2016/03/12-best-paper-flowers-volume-3.html/ Florida US
607982733 Impression Dominos Pizza 5 6 Female 25k and below /2014/07/make-the-midsummer-floral-crown.html/ Pennsylvania US
607982734 Impression Lowes Companies 5 13 Female unknown /2016/01/eyeball-drink-stirrers.html/ New York US
607982735 Impression Lowes Companies 4 6 Male unknown /2016/02/review-of-the-5-best-planners-of-2016.html/2/ California US
...
If we take the first row above, and check for hidden characters, we'll see that we have the tab character (\t
) separating the fields:
607982731<tab>Downstream<space>Conversion<tab>Verizon<space>Wireless<tab>5<space>9<tab>Male<tab>50k<space>-<space>75k<tab>/<tab>Utah<tab>US
Mapping the fields back to the table schema, we'll see that the campaign
column is meant to be an integer, but the stream contains two integers separated by a space:
... <tab>integer<space>integer<tab> ...
We can change the pipeline slightly to deal with this problem. First, we'll drop the pipeline:
DROP PIPELINE events;
Then we'll modify line 4 of the pipeline with the keyword IGNORE
, as follows:
CREATE PIPELINE events
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'
BATCH_INTERVAL 2500
IGNORE INTO TABLE events
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES STARTING BY ''
(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);
We can configure the pipeline again to start from the earliest offset, as follows:
ALTER PIPELINE events SET OFFSETS EARLIEST;
and we can test the pipeline again before we start running it, as follows:
TEST PIPELINE events LIMIT 1;
This time the pipeline test should be successful and we should see some output, similar to the following:
+-----------+------------+------------+----------+---------+---------+----------------------------------+--------+---------+
| user_id | event_name | advertiser | campaign | gender | income | page_url | region | country |
+-----------+------------+------------+----------+---------+---------+----------------------------------+--------+---------+
| 605804911 | Impression | Aldi | 13 | unknown | unknown | /2016/05/day-9-draw-a-rose.html/ | Nevada | US |
+-----------+------------+------------+----------+---------+---------+----------------------------------+--------+---------+
We can now start the pipeline:
START PIPELINE events;
Install Metabase
To use Metabase, we'll follow the JAR file installation instructions and run the JAR file, as follows:
java -jar metabase.jar
Next, we'll open the following webpage:
http://localhost:3000/setup
We should see the following, shown in Figure 1.
We'll work through the various wizards to complete the language and user settings. To add our data, we'll select MySQL and we'll need to fill in the details for our connection, as follows:
- Display name: S2
-
Host:
<host>
- Port: 3306
- Database name: adtech
- Username: admin
-
Password:
<password>
We'll replace the <host>
and <password>
with the values from our SingleStoreDB Cloud account.
We'll click the Connect database button. The connection should be successful.
Finally, we'll click Finish and then Take me to Metabase.
Next, we should see the following, as shown in Figure 2.
Create Dashboard
We are now ready to create our dashboard.
1. Total Number of Events
From the top right, we'll select + New and then SQL Query as shown in Figure 3.
After selecting the S2 database, we'll enter the following into the SQL Editor in Metabase:
SELECT COUNT(*) FROM events;
We can run the query using the right arrow button (▶) in the bottom right and use the Save link in the top right to save this query. We'll call the query Total Number of Events. We'll create a new dashboard called AdTech and add the query to the dashboard, as shown in Figure 4.
2. Events by Region
We'll repeat the previous process of creating an SQL Query and adding it to the AdTech dashboard for the following query (Events by Region):
SELECT events.country AS `events.country`, COUNT(events.country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;
With this query, we have the option of creating a Visualization. We'll create a pie chart.
3. Events by Top 5 Advertisers
We'll repeat the previous process of creating an SQL Query and adding it to the AdTech dashboard for the following query (Events by Top 5 Advertisers):
SELECT events.advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (events.advertiser LIKE '%Subway%' OR events.advertiser LIKE '%McDonals%' OR events.advertiser LIKE '%Starbucks%' OR events.advertiser LIKE '%Dollar General%' OR events.advertiser LIKE '%YUM! Brands%' OR events.advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;
With this query, we have the option of creating a Visualization. We'll create a bar chart.
4. Ad Visitors by Gender and Income
We'll repeat the previous process of creating an SQL Query and adding it to the AdTech dashboard for the following query (Ad Visitors by Gender and Income):
SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
WHEN xx.z___min_rank = xx.z___rank THEN 1
ELSE 0
END AS z__is_highest_ranked_cell
FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
FROM (SELECT *, RANK() OVER (ORDER BY CASE
WHEN bb.z__pivot_col_rank = 1 THEN (CASE
WHEN bb.`events.count` IS NOT NULL THEN 0
ELSE 1
END)
ELSE 2
END, CASE
WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
ELSE NULL
END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
WHEN ww.`events.gender` IS NULL THEN 1
ELSE 0
END, ww.`events.gender`) AS z__pivot_col_rank
FROM (SELECT events.gender AS `events.gender`, events.income AS `events.income`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (events.income <> 'unknown' OR events.income IS NULL)
GROUP BY 1, 2) ww) bb
WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;
With this query, we have the option of creating a Visualization. We'll create a table.
Final Dashboard
Figure 5 shows an example of the charts being sized and positioned on the AdTech dashboard. We'll set the auto-refresh option to 1 minute.
Summary
In this article, we have created a pipeline in SingleStoreDB to connect to a Kafka broker. We have identified a problem with the incoming data and modified our pipeline to handle the issue. We have also used Metabase to create a quick visual dashboard to help us gain insights into our Ad Campaign. For additional details, please see the SingleStore documentation on how to Load and Analyze AdTech Data.
Appendix
CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;
CREATE TABLE events (
user_id INT,
event_name VARCHAR(128),
advertiser VARCHAR(128),
campaign INT(11),
gender VARCHAR(128),
income VARCHAR(128),
page_url VARCHAR(128),
region VARCHAR(128),
country VARCHAR(128),
SORT KEY adtmidx (user_id, event_name, advertiser, campaign),
SHARD KEY user_id (user_id)
);
CREATE REFERENCE TABLE campaigns (
campaign_id SMALLINT(6) NOT NULL DEFAULT '0',
campaign_name VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
PRIMARY KEY (campaign_id)
);
INSERT INTO campaigns VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');
CREATE PIPELINE events
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'
BATCH_INTERVAL 2500
INTO TABLE events
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES STARTING BY ''
(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);
ALTER PIPELINE events SET OFFSETS EARLIEST;
TEST PIPELINE events LIMIT 1;
DROP PIPELINE events;
CREATE PIPELINE events
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'
BATCH_INTERVAL 2500
IGNORE INTO TABLE events
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES STARTING BY ''
(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);
ALTER PIPELINE events SET OFFSETS EARLIEST;
TEST PIPELINE events LIMIT 1;
START PIPELINE events;
-- Total Number of Events
SELECT COUNT(*) FROM events;
-- Events by Region
SELECT events.country AS `events.country`, COUNT(events.country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;
-- Events by Top 5 Advertisers
SELECT events.advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (events.advertiser LIKE '%Subway%' OR events.advertiser LIKE '%McDonals%' OR events.advertiser LIKE '%Starbucks%' OR events.advertiser LIKE '%Dollar General%' OR events.advertiser LIKE '%YUM! Brands%' OR events.advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;
-- Ad Visitors by Gender and Income
SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
WHEN xx.z___min_rank = xx.z___rank THEN 1
ELSE 0
END AS z__is_highest_ranked_cell
FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
FROM (SELECT *, RANK() OVER (ORDER BY CASE
WHEN bb.z__pivot_col_rank = 1 THEN (CASE
WHEN bb.`events.count` IS NOT NULL THEN 0
ELSE 1
END)
ELSE 2
END, CASE
WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
ELSE NULL
END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
WHEN ww.`events.gender` IS NULL THEN 1
ELSE 0
END, ww.`events.gender`) AS z__pivot_col_rank
FROM (SELECT events.gender AS `events.gender`, events.income AS `events.income`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (events.income <> 'unknown' OR events.income IS NULL)
GROUP BY 1, 2) ww) bb
WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;
Top comments (0)