DEV Community

Daniel Ferrari
Daniel Ferrari

Posted on

A Custom Airflow EMR Sensor for Multiple Steps

All data engineers know Apache Airflow; if you don't know, you should. It is a platform to create, schedule, and monitor workflows. It will orchestrate all sorts of pipelines.

At Basecone/WK, we use the managed AWS Airflow (MWAA) to orchestrate pipelines for data analytics and data science use cases. Combined with Spark clusters (EMR) and ECS tasks, it's easy to scale the workload with a relatively small Airflow cluster. This approach saves on costs since Airflow is the only service running 24/7 with minimal resources, and EMR and ECS are up only when processing data.

Airflow has operators and sensors for both EMR and ECS services, and we'll focus on the EMR ones for this post. You can use the operators to create and terminate Spark clusters for each DAG execution, which saves money (EMR cluster can be expensive) and isolates the data pipeline use case (helps you size the cluster better and debug issues). You can start adding the steps when the sensor informs you that the cluster is ready to accept jobs.

We have DAGs responsible for managing dozens, almost a hundred, of EMR steps. The EMR operators add all the steps to execution on the cluster, and there is an EMR sensor waiting for each step completion (success or failure). We rely on EMR scalability to deal with the parallelization using step concurrency, and we size each cluster accordingly. But this situation is unsuitable for the Airflow cluster since dozens of sensors are in the queue, waiting for free workers. Remember, we want to keep the Airflow cluster small with only a few workers. This condition also impacts other DAGs, creating a chain reaction in the Airflow cluster and delaying all the executions.

For example, a DAG with multiple parallel EMR steps needs to add one sensor for each task, creating a pipeline similar to the one below, where a group has pairs for task/sensor to submit the spark job and monitor its state. For a small DAG, this is fine. But when you have dozens of tasks in dozens of DAGs, you will have hundreds of sensors waiting for Spark jobs to finish. It is worth highlighting that the task to submit the Spark job runs very quickly (under 30 seconds), and then the sensor will wait for the time necessary for the job to finish.

DAG with step sensors

The first approach to address the problem was to replace all EMR step sensors with one sensor at the cluster level. With that, Airflow can monitor the EMR state and check when the cluster is on "waiting", meaning all steps are finished. This solved the main issue, and the Airflow workers were free to manage more DAGs and tasks. Then, we noticed that we lost the ability to notify for failed DAG executions when one of the EMR steps failed. Moving the monitoring from the step to the cluster level removes the check for failed states since the cluster is never in this state, even if all the steps fail.

After studying how the EMR sensors work on Airflow, we designed a new approach: a custom sensor monitoring all steps state. The EMR base sensor implements a poke mechanism based on the EMR boto3 client. Using the list_steps function, we can simultaneously check the state for all the steps.

The new sensor calls the EMR cluster and retrieves all the active and failed steps. If there are any failed steps, it will raise an exception and finish the task with a failed state. The sensor will reschedule the task and try again later for existing active steps. When there are no active or failed steps, the task terminates with success.

Replacing all the sensors with just one improves the DAG readability (fewer boxes) and debug (logs in one sensor). A DAG with 50 Spark jobs will use one sensor to monitor all the tasks, then the Spark cluster deals with the concurrency, and the Airflow cluster is free to execute other tasks.

DAG with new sensor

We have had the solution running for the last few weeks, and we have observed some positive points:

  • The notifications are still tied to EMR step success or failure.
  • The average DAG execution time decreased since tasks weren't waiting on the Airflow queue.
  • Cost savings with early EMR cluster termination when failure happens. The first failed step forces a cluster termination signal, killing other steps.
  • Fewer requests on AWS EMR API per DAG execution since this endpoint has a low quota.

The next step is taking the sensor to the Airflow community to get feedback for improvements and do the work required to add it as an official AWS EMR sensor to the Apache Airflow project.

Find here the full code for the EMR Airflow sensor.

Top comments (0)