Abstract
Apache Airflow can be used to manage workflows for data engineering pipelines. It can easily be used with SingleStoreDB and, in this article, we'll walk through the installation process and the creation of a simple workflow that uses several tables in a SingleStoreDB database.
The code files used in this article are available on GitHub.
Introduction
SingleStoreDB is MySQL wire-compatible, and we'll adapt a simple Apache Airflow MySQL example and show it working with SingleStoreDB.
Create a SingleStoreDB Cloud account
A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use Airflow Demo Group as our Workspace Group Name and airflow-demo as our Workspace Name. We'll make a note of our password and host name.
Create the database and tables
Our example is derived from a simple example described in the article Scheduling a SQL script, using Apache Airflow, with an example.
Using the SQL Editor in SingleStoreDB, we'll create a database and several tables:
CREATE DATABASE IF NOT EXISTS airflow_demo;
USE airflow_demo;
DROP TABLE IF EXISTS event;
CREATE TABLE IF NOT EXISTS event (
event_id INT,
spend_amt FLOAT,
user_id INT,
date DATE
);
DROP TABLE IF EXISTS event_stats;
CREATE TABLE IF NOT EXISTS event_stats (
date DATE,
user_id INT,
total_spend_amt FLOAT,
PRIMARY KEY (date, user_id)
);
We'll populate the event table, as follows:
INSERT INTO event VALUES
(1, 34.36, 2, CURRENT_DATE()),
(2, 94.92, 2, CURRENT_DATE()),
(3, 70.76, 9, CURRENT_DATE()),
(4, 34.26, 7, CURRENT_DATE()),
(5, 58.36, 1, CURRENT_DATE()),
(6, 39.64, 2, CURRENT_DATE()),
(7, 64.83, 10, CURRENT_DATE()),
(8, 39.33, 1, CURRENT_DATE()),
(9, 100, -99, CURRENT_DATE()),
(9, 69.06, 10, ADDDATE(CURRENT_DATE(), 1)),
(10, 63.79, 3, ADDDATE(CURRENT_DATE(), 1)),
(11, 40.87, 3, ADDDATE(CURRENT_DATE(), 1)),
(12, 32.76, 10, ADDDATE(CURRENT_DATE(), 1)),
(13, 11.84, 3, ADDDATE(CURRENT_DATE(), 1)),
(14, 88.07, 2, ADDDATE(CURRENT_DATE(), 1)),
(15, 100, -99, ADDDATE(CURRENT_DATE(), 1));
Example output:
+----------+-----------+---------+------------+
| event_id | spend_amt | user_id | date |
+----------+-----------+---------+------------+
| 1 | 34.36 | 2 | 2022-10-15 |
| 2 | 94.92 | 2 | 2022-10-15 |
| 3 | 70.76 | 9 | 2022-10-15 |
| 4 | 34.26 | 7 | 2022-10-15 |
| 5 | 58.36 | 1 | 2022-10-15 |
| 6 | 39.64 | 2 | 2022-10-15 |
| 7 | 64.83 | 10 | 2022-10-15 |
| 8 | 39.33 | 1 | 2022-10-15 |
| 9 | 100 | -99 | 2022-10-15 |
| 9 | 69.06 | 10 | 2022-10-16 |
| 10 | 63.79 | 3 | 2022-10-16 |
| 11 | 40.87 | 3 | 2022-10-16 |
| 12 | 32.76 | 10 | 2022-10-16 |
| 13 | 11.84 | 3 | 2022-10-16 |
| 14 | 88.07 | 2 | 2022-10-16 |
| 15 | 100 | -99 | 2022-10-16 |
+----------+-----------+---------+------------+
Install Apache Airflow
We'll use a Virtual Machine running Ubuntu 22.04.2 as our test environment. An alternative would be to use venv
.
First, we'll upgrade pip
:
pip install --upgrade pip
Since we are using MySQL tools, we'll need to install the following:
sudo apt install pkg-config
sudo apt install libmysqlclient-dev
and
pip install apache-airflow-providers-mysql
Next, we'll slightly modify the installation instructions, as follows:
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.10.2
PYTHON_VERSION="$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
We'll also need:
pip install apache-airflow-providers-common-sql>=1.17.0
Now, we'll run the following:
airflow standalone
The last set of messages displayed by the above command should be similar to the following:
standalone |
standalone | Airflow is ready
standalone | Login with username: admin password: kRgbcnSCXEQ8SYQ6
standalone | Airflow Standalone is for development purposes only. Do not use this in production!
standalone |
Make a note of the password in your environment.
Launching a web browser and entering http://localhost:8080
will show a login screen similar to Figure 1.
We'll use the credentials that we received earlier. After logging in, we should see a screen similar to Figure 2.
Create SingleStoreDB connection
In Apache Airflow, selecting Admin > Connections, we'll see a large list of connections. If we scroll down, we'll find mysql_default
. Using the pencil icon, we'll edit the connection. Here is what we need to enter:
- Connection Id: mysql_default
- Connection Type: MySQL
- Host: <host>
- Schema: airflow_demo
- Login: admin
- Password: <password>
- Port: 3306
We'll replace the <host>
and <password>
with the values from our SingleStoreDB Cloud account.
We'll then use the Save button to save the connection.
Create DAG and SQL files
Let's create a new Python file called airflow_demo_dag.py
with the following code, derived from the article mentioned earlier:
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from datetime import date
default_arg = {
'owner' : 'airflow',
'start_date' : str(date.today())
}
dag = DAG(
'simple-s2-dag',
default_args = default_arg,
schedule_interval = '0 0 * * *'
)
s2_task = MySqlOperator(
task_id = 's2_task',
mysql_conn_id = 'mysql_default',
autocommit = True,
sql = 'airflow_demo.sql',
params = {'test_user_id': -99},
dag = dag
)
s2_task
and a SQL file called airflow_demo.sql
with the following code:
USE airflow_demo;
DROP TABLE IF EXISTS event_stats_staging;
CREATE TABLE event_stats_staging AS
SELECT date, user_id, SUM(spend_amt) AS total_spend_amt
FROM event
WHERE date = '{{ ds }}' AND user_id <> {{ params.test_user_id }}
GROUP BY date, user_id;
INSERT INTO event_stats (date, user_id, total_spend_amt)
SELECT date, user_id, total_spend_amt
FROM event_stats_staging
ON DUPLICATE KEY UPDATE total_spend_amt = VALUES(total_spend_amt);
DROP TABLE IF EXISTS event_stats_staging;
The DAG file is scheduled to run at midnight. The SQL code performs an aggregation of the data in the event
table and stores the result in the event_stats
table. For example, the first run of the code would produce the following:
+------------+---------+-----------------+
| date | user_id | total_spend_amt |
+------------+---------+-----------------+
| 2022-10-15 | 1 | 97.69 |
| 2022-10-15 | 2 | 168.92 |
| 2022-10-15 | 7 | 34.26 |
| 2022-10-15 | 9 | 70.76 |
| 2022-10-15 | 10 | 64.83 |
+------------+---------+-----------------+
The second run of the code would produce the following:
+------------+---------+-----------------+
| date | user_id | total_spend_amt |
+------------+---------+-----------------+
| 2022-10-15 | 1 | 97.69 |
| 2022-10-15 | 2 | 168.92 |
| 2022-10-15 | 7 | 34.26 |
| 2022-10-15 | 9 | 70.76 |
| 2022-10-15 | 10 | 64.83 |
| 2022-10-16 | 2 | 88.07 |
| 2022-10-16 | 3 | 116.5 |
| 2022-10-16 | 10 | 101.82 |
+------------+---------+-----------------+
We'll save our two files in the directory ~/airflow/dags
. We may need to create the directory if it does not exist.
Run the code
In Apache Airflow, we should see a new DAG called simple-s2-dag
in the list of DAGS. If it is not visible, it may be necessary to rerun the command:
airflow standalone
We can toggle the button to the left of the name to enable the DAG. On the extreme right-hand side, we can see an arrow button (▶) ︎that will allow us to run the DAG immediately. Doing so should be successful. Clicking on the DAG name will provide additional details, as shown in Figure 3.
From SingleStoreDB, we can check the event_stats
table:
SELECT * FROM event_stats ORDER BY user_id;
The result should be similar to the following:
+------------+---------+-----------------+
| date | user_id | total_spend_amt |
+------------+---------+-----------------+
| 2022-10-15 | 1 | 97.69 |
| 2022-10-15 | 2 | 168.92 |
| 2022-10-15 | 7 | 34.26 |
| 2022-10-15 | 9 | 70.76 |
| 2022-10-15 | 10 | 64.83 |
+------------+---------+-----------------+
5 rows in set (0.02 sec)
Summary
In this article, we have used a straightforward example to demonstrate Apache Airflow with SingleStoreDB. Since SingleStoreDB is MySQL wire-compatible, many existing tools and techniques can be used out of the box. This makes it easy for MySQL and MariaDB developers to transition to SingleStoreDB.
Top comments (0)