DEV Community

Cover image for Airflow Quick Start With docker-compose on AWS EC2

Airflow Quick Start With docker-compose on AWS EC2

Abstract

For quick set up and start learning Apache Airflow, we will deploy airflow using docker-compose and running on AWS EC2

Table Of Contents


๐Ÿš€ Introduction

The docker-compose.yaml contains several service definitions:
- airflow-scheduler - The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
- airflow-webserver - The webserver available at http://localhost:8080.
- airflow-worker - The worker that executes the tasks given by the scheduler.
- airflow-init - The initialization service.
- flower - The flower app for monitoring the environment. It is available at http://localhost:5555.
- postgres - The database.
- redis - The redis - broker that forwards messages from scheduler to worker.

Some directories in the container are mounted, which means that their contents are synchronized between the services and persistent.

  • ./dags - you can put your DAG files here.
  • ./logs - contains logs from task execution and scheduler.
  • ./plugins - you can put your custom plugins here.

๐Ÿš€ Additional PIP requirements

  • airflow image contains almost enough PIP packages for operating, but we still need to install extra packages such as clickhouse-driver, pandahouse and apache-airflow-providers-slack.

  • Airflow from 2.1.1 supports ENV _PIP_ADDITIONAL_REQUIREMENTS to add additional requirements when starting all containers

_PIP_ADDITIONAL_REQUIREMENTS: 'pandahouse==0.2.7 clickhouse-driver==0.2.1 apache-airflow-providers-slack'
Enter fullscreen mode Exit fullscreen mode
  • It's not recommended to use this way on production, we should build the own image which contain all necessary pip packages then push to AWS ECR

๐Ÿš€ How to build customize airflow image

  • Build own image.

  • requirements.txt

pandahouse
clickhouse-driver
Enter fullscreen mode Exit fullscreen mode
  • Dockerfile
FROM apache/airflow:2.1.2-python3.9
COPY requirements.txt .
RUN pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode
  • docker build -t my-airlfow .

๐Ÿš€ Persistent airflow log, dags, and plugins

Not only persistent the folders but also share these ones between scheduler, worker and web-server

  volumes:
    - /mnt/airflow/dags:/opt/airflow/dags
    - /mnt/airflow/logs:/opt/airflow/logs
    - /mnt/airflow/plugins:/opt/airflow/plugins
    - /mnt/airflow/data:/opt/airflow/data
Enter fullscreen mode Exit fullscreen mode

๐Ÿš€ Using git-sync to up-to-date DAGs

  • The git-sync service will polling the registered project each 10s to clone the new commit to /dags
  • We use HTTP method and Access key token to provide permission for the container
  af-gitsync:
    container_name: af-gitsync
    image: k8s.gcr.io/git-sync/git-sync:v3.2.2
    environment:
      - GIT_SYNC_REV=HEAD
      - GIT_SYNC_DEPTH=1
      - GIT_SYNC_USERNAME=airflow
      - GIT_SYNC_MAX_FAILURES=0
      - GIT_KNOWN_HOSTS=false
      - GIT_SYNC_DEST=repo
      - GIT_SYNC_REPO=https://cloudopz.co/devops/airflow-dags.git
      - GIT_SYNC_WAIT=60
      - GIT_SYNC_TIMEOUT=120
      - GIT_SYNC_ADD_USER=true
      - GIT_SYNC_PASSWORD=
      - GIT_SYNC_ROOT=/dags
      - GIT_SYNC_BRANCH=master
    volumes:
      - /mnt/airflow/dags:/dags
Enter fullscreen mode Exit fullscreen mode

๐Ÿš€ How to run

  1. Initializing Environment
mkdir ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
Enter fullscreen mode Exit fullscreen mode
  1. Prepare docker-compose.yaml docker-compose.yaml
version: '3.5'
x-airflow-common:
  &airflow-common
  image: apache/airflow:2.1.2-python3.9
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@af-pg/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@af-pg/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@af-redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW_CONN_RDB_CONN: 'postgresql://dbapplication_user:dbapplication_user@rdb:5432/postgres'
    _PIP_ADDITIONAL_REQUIREMENTS: 'pandahouse==0.2.7 clickhouse-driver==0.2.1 apache-airflow-providers-slack'
  volumes:
    - /mnt/airflow/dags:/opt/airflow/dags
    - /mnt/airflow/logs:/opt/airflow/logs
    - /mnt/airflow/plugins:/opt/airflow/plugins
    - /mnt/airflow/data:/opt/airflow/data
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    - af-redis
    - af-pg

services:
  af-pg:
    image: postgres:13
    container_name: af-pg
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  af-redis:
    container_name: af-redis
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  af-websrv:
    container_name: af-websrv
    <<: *airflow-common
    command: webserver
    ports:
      - 28080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  af-sch:
    container_name: af-sch
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  af-w:
    container_name: af-w
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  af-int:
    container_name: af-int
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'

  af-flower:
    container_name: af-flower
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  af-gitsync:
    container_name: af-gitsync
    image: k8s.gcr.io/git-sync/git-sync:v3.2.2
    environment:
      - GIT_SYNC_REV=HEAD
      - GIT_SYNC_DEPTH=1
      - GIT_SYNC_USERNAME=airflow
      - GIT_SYNC_MAX_FAILURES=0
      - GIT_KNOWN_HOSTS=false
      - GIT_SYNC_DEST=repo
      - GIT_SYNC_REPO=https://cloudopz.co/devops/airflow-dags.git
      - GIT_SYNC_WAIT=60
      - GIT_SYNC_TIMEOUT=120
      - GIT_SYNC_ADD_USER=true
      - GIT_SYNC_PASSWORD=
      - GIT_SYNC_ROOT=/dags
      - GIT_SYNC_BRANCH=master
    volumes:
      - /mnt/airflow/dags:/dags

volumes:
  postgres-db-volume:

Enter fullscreen mode Exit fullscreen mode
  1. Run airflow-init to setup airflow database
docker-compose up airflow-init
Enter fullscreen mode Exit fullscreen mode
  • Running airflow - Up
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

๐Ÿš€ Add airflow connectors

  • Add slack connection
airflow connections add 'airflow-slack' \
    --conn-type 'http'
    --conn-password '/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX' \
    --conn-host 'https://hooks.slack.com/services'
Enter fullscreen mode Exit fullscreen mode
  • We can add connector in UI

    Airflow Quick Start With docker-compose on AWS EC2

  • We can add connection through env file eg. .env in docker-compose

AIRFLOW_CONN_MY_POSTGRES_CONN: 'postgresql://airflow:airflow@postgres-pg:5432/airflow'
AIRFLOW_CONN_AIRFLOW_SLACK_CONN: 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
Enter fullscreen mode Exit fullscreen mode

๐Ÿš€ Understand airflow parameters in airflow.models

  • airflow.models

  • Context

    • on_failure_callback (TaskStateChangeCallback) โ€“ a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
    • on_execute_callback (TaskStateChangeCallback) โ€“ much like the on_failure_callback except that it is executed right before the task is executed.
    • on_retry_callback (TaskStateChangeCallback) โ€“ much like the on_failure_callback except that it is executed when retries occur.
    • on_success_callback (TaskStateChangeCallback) โ€“ much like the on_failure_callback except that it is executed when the task succeeds.
    • trigger_rule (str) โ€“ defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule

vumdao image

Discussion (0)