DEV Community

Cover image for Airflow Quick Start With docker-compose on AWS EC2

Airflow Quick Start With docker-compose on AWS EC2

Abstract

For quick set up and start learning Apache Airflow, we will deploy airflow using docker-compose and running on AWS EC2

Table Of Contents


πŸš€ Introduction

The docker-compose.yaml contains several service definitions:
- airflow-scheduler - The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
- airflow-webserver - The webserver available at http://localhost:8080.
- airflow-worker - The worker that executes the tasks given by the scheduler.
- airflow-init - The initialization service.
- flower - The flower app for monitoring the environment. It is available at http://localhost:5555.
- postgres - The database.
- redis - The redis - broker that forwards messages from scheduler to worker.

Some directories in the container are mounted, which means that their contents are synchronized between the services and persistent.

  • ./dags - you can put your DAG files here.
  • ./logs - contains logs from task execution and scheduler.
  • ./plugins - you can put your custom plugins here.

πŸš€ Additional PIP requirements

  • airflow image contains almost enough PIP packages for operating, but we still need to install extra packages such as clickhouse-driver, pandahouse and apache-airflow-providers-slack.

  • Airflow from 2.1.1 supports ENV _PIP_ADDITIONAL_REQUIREMENTS to add additional requirements when starting all containers

_PIP_ADDITIONAL_REQUIREMENTS: 'pandahouse==0.2.7 clickhouse-driver==0.2.1 apache-airflow-providers-slack'
Enter fullscreen mode Exit fullscreen mode
  • It's not recommended to use this way on production, we should build the own image which contain all necessary pip packages then push to AWS ECR

πŸš€ How to build customize airflow image

  • Build own image.

  • requirements.txt

pandahouse
clickhouse-driver
Enter fullscreen mode Exit fullscreen mode
  • Dockerfile
FROM apache/airflow:2.1.2-python3.9
COPY requirements.txt .
RUN pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode
  • docker build -t my-airlfow .

πŸš€ Persistent airflow log, dags, and plugins

Not only persistent the folders but also share these ones between scheduler, worker and web-server

  volumes:
    - /mnt/airflow/dags:/opt/airflow/dags
    - /mnt/airflow/logs:/opt/airflow/logs
    - /mnt/airflow/plugins:/opt/airflow/plugins
    - /mnt/airflow/data:/opt/airflow/data
Enter fullscreen mode Exit fullscreen mode

πŸš€ Using git-sync to up-to-date DAGs

  • The git-sync service will polling the registered project each 10s to clone the new commit to /dags
  • We use HTTP method and Access key token to provide permission for the container
  af-gitsync:
    container_name: af-gitsync
    image: k8s.gcr.io/git-sync/git-sync:v3.2.2
    environment:
      - GIT_SYNC_REV=HEAD
      - GIT_SYNC_DEPTH=1
      - GIT_SYNC_USERNAME=airflow
      - GIT_SYNC_MAX_FAILURES=0
      - GIT_KNOWN_HOSTS=false
      - GIT_SYNC_DEST=repo
      - GIT_SYNC_REPO=https://cloudopz.co/devops/airflow-dags.git
      - GIT_SYNC_WAIT=60
      - GIT_SYNC_TIMEOUT=120
      - GIT_SYNC_ADD_USER=true
      - GIT_SYNC_PASSWORD=
      - GIT_SYNC_ROOT=/dags
      - GIT_SYNC_BRANCH=master
    volumes:
      - /mnt/airflow/dags:/dags
Enter fullscreen mode Exit fullscreen mode

πŸš€ How to run

  1. Initializing Environment
mkdir ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
Enter fullscreen mode Exit fullscreen mode
  1. Prepare docker-compose.yaml docker-compose.yaml
version: '3.5'
x-airflow-common:
  &airflow-common
  image: apache/airflow:2.1.2-python3.9
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@af-pg/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@af-pg/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@af-redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW_CONN_RDB_CONN: 'postgresql://dbapplication_user:dbapplication_user@rdb:5432/postgres'
    _PIP_ADDITIONAL_REQUIREMENTS: 'pandahouse==0.2.7 clickhouse-driver==0.2.1 apache-airflow-providers-slack'
  volumes:
    - /mnt/airflow/dags:/opt/airflow/dags
    - /mnt/airflow/logs:/opt/airflow/logs
    - /mnt/airflow/plugins:/opt/airflow/plugins
    - /mnt/airflow/data:/opt/airflow/data
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    - af-redis
    - af-pg

services:
  af-pg:
    image: postgres:13
    container_name: af-pg
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  af-redis:
    container_name: af-redis
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  af-websrv:
    container_name: af-websrv
    <<: *airflow-common
    command: webserver
    ports:
      - 28080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  af-sch:
    container_name: af-sch
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  af-w:
    container_name: af-w
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  af-int:
    container_name: af-int
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'

  af-flower:
    container_name: af-flower
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  af-gitsync:
    container_name: af-gitsync
    image: k8s.gcr.io/git-sync/git-sync:v3.2.2
    environment:
      - GIT_SYNC_REV=HEAD
      - GIT_SYNC_DEPTH=1
      - GIT_SYNC_USERNAME=airflow
      - GIT_SYNC_MAX_FAILURES=0
      - GIT_KNOWN_HOSTS=false
      - GIT_SYNC_DEST=repo
      - GIT_SYNC_REPO=https://cloudopz.co/devops/airflow-dags.git
      - GIT_SYNC_WAIT=60
      - GIT_SYNC_TIMEOUT=120
      - GIT_SYNC_ADD_USER=true
      - GIT_SYNC_PASSWORD=
      - GIT_SYNC_ROOT=/dags
      - GIT_SYNC_BRANCH=master
    volumes:
      - /mnt/airflow/dags:/dags

volumes:
  postgres-db-volume:

Enter fullscreen mode Exit fullscreen mode
  1. Run airflow-init to setup airflow database
docker-compose up airflow-init
Enter fullscreen mode Exit fullscreen mode
  • Running airflow - Up
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

πŸš€ Add airflow connectors

  • Add slack connection
airflow connections add 'airflow-slack' \
    --conn-type 'http'
    --conn-password '/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX' \
    --conn-host 'https://hooks.slack.com/services'
Enter fullscreen mode Exit fullscreen mode
  • We can add connector in UI

    Airflow Quick Start With docker-compose on AWS EC2

  • We can add connection through env file eg. .env in docker-compose

AIRFLOW_CONN_MY_POSTGRES_CONN: 'postgresql://airflow:airflow@postgres-pg:5432/airflow'
AIRFLOW_CONN_AIRFLOW_SLACK_CONN: 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
Enter fullscreen mode Exit fullscreen mode

πŸš€ Understand airflow parameters in airflow.models

  • airflow.models

  • Context

    • on_failure_callback (TaskStateChangeCallback) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
    • on_execute_callback (TaskStateChangeCallback) – much like the on_failure_callback except that it is executed right before the task is executed.
    • on_retry_callback (TaskStateChangeCallback) – much like the on_failure_callback except that it is executed when retries occur.
    • on_success_callback (TaskStateChangeCallback) – much like the on_failure_callback except that it is executed when the task succeeds.
    • trigger_rule (str) – defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule

Top comments (2)

Collapse
 
hanifmusa96 profile image
Muhammad Hanif Mohamad Musa

is there any aws services in that same as cloud composer?

Collapse
 
okrohan profile image
Rohan Salunke

Haven't used it, but i believe follow should work
aws.amazon.com/managed-workflows-f...