DEV Community

Cover image for Apache Airflow. How to make the complex workflow as an easy job
Sergii Lischuk
Sergii Lischuk

Posted on

Apache Airflow. How to make the complex workflow as an easy job

Intro

A couple of weeks ago I started to work with this platform in terms of a feature request. The feature was connected with GCP, observation, and tons of data to be processed. I was looking for something really powerful to make the flow easy and clean to create and run the jobs. And we should not forget consistency, fault tolerance, and correct error handling as well.

My research brings me to the orchestration topic, especially to the Apache Airflow.

Why do you need an orchestration?

If you have just a simple task to export some data to Excel, maybe, you don't need to use orchestration at all. But, if you are working with data, which brings you a really good profit after the processing, or, you mainly working with a large amount of non-clean data daily - seems to be you are in the right place to start thinking about it.

For example, if your company processes a big amount of data and gives your customers good advice and profits from it, almost in all cases the workflow will be the same. Each night somewhere in S3-bucket/Azure Blob storage your providers will create some files with raw data inside. Secondly, you collect that data and aggregate it structurally (eg, push to BigQuery table). Further, you process it with complicated SQL scripts, make (again) aggregation, and patch some invalid data. After, you need to check that data with some external API (could be even your special services), but the data size is too big to start working without parallelism or queue processing. And finally, after the validation, you need to connect the final result with some data-preview tools (like Tableau dashboard) to show it to your customers.

As we can see, this process is not so easy at the first glance. And there are much more cases (pipelines) to be handled in real life.

For this reason, you need to have a workflow manager. And for the last couple of years, de-facto, this is Apace Airflow.

History

Airflow was born like an internal project in Airbnb, in 2014. From the start, it was an open-source project, so, it was easy to provide appropriate functionality with PR as fast as it is possible. In 2016, the project moves to Apache Incubator and in 2019 Airflow becomes a top-level project in Apache Software Foundation.

Components

Airflow is a python project, so, almost all the functionality is a python code.

To start working with Airflow, you need to provide a configuration. And the configuration strongly depends on the number of parallel tasks which will do the job.

Also, there are required components, which will be always detected on your checklist:

  • Metadata database - database, where Airflow saves all meta-information regarding the current and past tasks, statutes, and results. I will recommend using Postgres here (more stable and effective workflow), but there are configuration and connections for MYSQL, MSSQL, and SQLite as well.

  • Scheduler - system component, which parses the files with pipeline descriptions and pushes them to the Executor

  • Web Server - Flask-based app, running through gunicorn. The main goal is to show visually the pipeline process and provide control over it.

  • Executor - special part, which runs the code (job)(see Execution)

Also, there are components, which are task-depended:

  • Triggerer - in a simple way - is an event-loop for async operators. Currently, there are not many of them, so, you need to think before regarding the Triggerer component in your workflow

  • Worker - modified worker from Celery lib. The small node where Celery can run your task.

Execution

Python code that describes the job must be executed somewhere. This part belongs to Executor. Airflow supports the next kind of executors:

  • SequentialExecutor - run code locally, in the main thread of the Airflow;

  • LocalExecutor - run code locally, but in different processes of OS

  • CeleryExecutor - do the job in Celery worker (Celery lib)

  • DaskExecutor - in Dask cluster

  • KubernetesExecutor - in k8 pods

From my experience, production code is based on Celery/Kubernetes executors. You need to take in mind this fact, cuz you must be careful with dependencies between the tasks in the pipeline. Every task will be running in its isolated environment, and, with high possibility on different physical devices (computers). So, the sequence of tasks "download file to disk" and "upload file to cloud storage" will not work correctly. More detailed information you can find here

As you may see, Airflow is very customizable. Configuration can be made in most custom ways to be as close as possible with requirements.

In general, there are 2 most spread architectures: single-node and multi-node:

Single node

Multi node

Installation

There are several ways to install Apache Airflow. Let's check them.

- PIP package manager

Not an easy way. First of all, you need to install all dependencies, after - installing and configuring DB (with SQLite you are restricted to use only SequentialExecutor). A good practice is to initialize python virtual env, and then start working with Airflow:

python -m pip install apache-airflow
airflow webserver
airflow scheduler
Enter fullscreen mode Exit fullscreen mode

- Separated Docker images

I found this useful when you are trying to run Airflow on bare-metal servers.

docker run … postgres
docker run … apache/airflow scheduler
docker run … apache/airflow webserver
Enter fullscreen mode Exit fullscreen mode

- Docker compose

In my opinion - the clean and easy way. You just need to create a docker-compose file with all configurations inside, so you will be able to reuse different variables and connections:

docker compose up
Enter fullscreen mode Exit fullscreen mode

- Astronomer CLI

I did not work so much with this tool, but it has a good community around. Also, they have an internal registry for any hooks/operators which will simplify the working process with Airflow.

Base concept

The main entity in this story is DAG (direct acyclic graph) - the housekeeper of tasks. Its spread title, you can meet it in different languages.

Tasks

The edges of this graph are Task, which is an instance of Operator.

All operators, in general, can be divided into:

  • Action operator - make some action (ReloadJobOperator, etc.)
  • Transfer operator - migrate data from one place to another (S3ToGCPOperator)
  • Sensor operator - wait some action (BQTablePartitionSensor)

Each pipeline is working inside Task Instance - an instance of the operator with timespan (when this operator is started). You are also able to configure Variables and Connections - environment variables, which are responsible for holding different connection strings, logins, etc. With the Web part, you can configure them in the UI.

Last but not least - Hook - is an interface for external services. Hooks are wrappers around popular libraries, APIs, DBs. E.g. - if you need to handle a connection to some SQL server, you can start thinking regarding SqlServiceHook (and it is already exist).

Create DAGs. Main moments

First of all, you need some declarations:

import requests
import pandas as pd
from pathlib import Path
from airflow.models import DAG
from airflow.operators.python import PythonOperator
Enter fullscreen mode Exit fullscreen mode

Next, let's create 2 functions for download data and pivot it (do not forget to check the executor, to be sure if these 2 tasks will be running in one place)

def download_data_fn():
   url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
   resp = requests.get(url)
   Path('titanic.csv').write_text(resp.content.decode())

def pivot_data_fn():
   df = pd.read_csv('titanic.csv')
   df = df.pivot_table(index='Sex', columns='Pclass', values='Name', aggfunc='count')
   df.reset_index().to_csv('titanic_pivoted.csv')
Enter fullscreen mode Exit fullscreen mode

And, the final step is to create DAG with execution order:

with DAG(dag_id='titanic_dag', schedule_interval='*/9 * * * *') as dag:
   download_data = PythonOperator(
       task_id='download_data',
       python_callable=download_data_fn,
       dag=dag,
   )

   pivot_data = PythonOperator(
       task_id='pivot_data',
       python_callable=pivot_data_fn,
       dag=dag,
   )

   download_data >> pivot_data

# variants:
# pivot_data << download_data 
# download_data.set_downstream(pivot_data)
# pivot_data.set_upstream(download_data)

Enter fullscreen mode Exit fullscreen mode

Created file must be located in folder, where all DAGs are located. By default, it is - $AIRFLOW_HOME/dags. If it is so - scheduler will take it to the execution order, and an Executor will run it every 9 minutes.

XComs

Sometimes we have dependencies between Task A and Task B. We do want not just to run tasks one after another, but also pass some results like pipes in the console. For this purpose, we can use XComs.

With XComs (cross-task communication) one task can write special metadata to metadata DB and another can read that data. We can take the previous example and modify it a little bit:

def download_data_fn(**context):
   filename = 'titanic.csv'
   url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
   resp = requests.get(url)
   Path(filename).write_text(resp.content.decode())
   #context['ti'].xcom_push(key='filename', value=filename) # option 1
   return filename # option 2


def pivot_data_fn(ti, **context):
   # filename = ti.xcom_pull(task_ids=['download_data'], key='filename') # option 1
   filename = ti.xcom_pull(task_ids=['download_data'], key='return_value') # option 2
   df = pd.read_csv(filename)
   df = df.pivot_table(index='Sex', columns='Pclass', values='Name', aggfunc='count')
   df.reset_index().to_csv('titanic_pivoted.csv')

with DAG(dag_id='titanic_dag', schedule_interval='*/9 * * * *') as dag:
   download_data = PythonOperator(
       task_id='download_data',
       python_callable=download_data_fn,
       provide_context=True,
   )

   pivot_data = PythonOperator(
       task_id='pivot_data',
       python_callable=pivot_data_fn,
       provide_context=True,
   )

   download_data >> pivot_data
Enter fullscreen mode Exit fullscreen mode

As we can see, there are different ways to use XCom objects, but you should keep in mind that data must be small. If the data size will big, you will spend time-saving that data to DB and can reach the limits of meta DB. Secondly, Airflow is just an orchestrator and must not be used for data processing.

Downsides

You need to know and keep in mind a lot of things to get good results. It's a complicated tool, but its do complex work. Also, in most cases for debugging and tracing you will have a local instance of Airflow, so you will have some stagging and prod environments.

Alternatives

It's good to know what Airflow is not the only one on the market. There are Dagster and Spotify Luigi and others. But they have different pros and cons, be sure that you did a good investigation on the market to choose the best suitable tool for your tasks.

That's all for today ;) I hope this article will give some clues and basics for them will start working with Airflow and orchestration. Stay tuned!

Top comments (1)

Collapse
 
droxt1 profile image
Droxt

Very useful post, Thank you.