In my day to day work one of the most common use cases for Apache Airflow is to run hundreds of scheduled BigQuery SQL scripts. Developers who start with Airflow often ask the following questions
How to use airflow to orchestrate sql?
This post aims to cover the above questions. This post assumes you have a basic understanding of Apache Airflow , Python and BigQuery SQL.
Let’s assume we want to run a BigQuery SQL script every day at midnight.
DAG: Directed Acyclic Graph, In Airflow this is used to denote a data pipeline which runs on a scheduled interval. A DAG can be made up of one or more individual tasks.
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
default_arg = {'owner': 'raju', 'start_date': '2024-04-29'}
dag = DAG('simple-bigquery-dag',
default_args=default_arg,
schedule_interval='0 0 * * *')
bqsql_task = BigQueryInsertJobOperato(dag=dag,
gcp_conn_id ='bqsql_default',
task_id='bqsql_task'
configuration={"query": {
"query": "{% include
'sql/bqsql_sample_query.sql' %}",
"useLegacySql": False,}},
params={'project_id': MYPROJECT_ID,
'dataset_name': MYDATASET_NAME,
}
In the above script
0 0 * * * is a cron schedule format, denoting that the DAG should be run everyday at midnight, which is denoted by the 0th hour of every day. (note that Airflow by default runs on UTC time) gcp_conn_id is the connection id for your BigQuery SQL database, you can set this in admin -> connections from airflow UI. There you will set the username and password that Airflow uses to access your database.
The SQL script to perform this operation is stored in a separate file bqsql_sample_query.sql. This file is read in when the DAG is being run.
CREATE TABLE IF NOT EXISTS {{params.project_id}}.{{params.dataset_name}}.Inventory
INSERT {{params.project_id}}.{{params.dataset_name}}.Inventory (product, quantity)
VALUES('top load washer', 10),
('front load washer', 20),
('dryer', 30),
('refrigerator', 10),
('microwave', 20),
('dishwasher', 30),
('oven', 5);
There are 2 key concepts in the templated SQL script shown above
Airflow macros: They provide access to the metadata that is available for each DAG run. We use the execution date as it provides the previous date over which we want to aggregate the data. ref: https://airflow.apache.org/docs/stable/macros.html
Templated parameters: If we want our SQL script to have some parameters that can be filled at run time from the DAG, we can pass them as parameters to the task. In our example we passed the project_id (MYDATASET_NAME).
Conclusion
In this post we saw
- How to schedule BigQuery SQL scripts using Apache Airflow
- How Airflow connects to the database using the connection id
- How to pass in parameters at run-time using input parameters and macros. Hope this gives you an understanding of how to schedule BigQuery SQL scripts in Airflow and how to use templating.
Some Final Words
If this blog was helpful and you wish to show a little support, you could:
- 👍 300 times for this story
- Follow me on LinkedIn: https://www.linkedin.com/in/raju-n-203b2115/
These actions really really really help me out, and are much appreciated!
Top comments (1)
Great article! What is the Bigquery connection? How do I find it? Thanks!