DEV Community

gudata
gudata

Posted on

Airflow Development with Docker

Airflow tooling and code structure

The airflow should be easy to work with and develop.

The new astro-cli tool from astronomers is intended to help users work with their cloud and it doesn't cover all the development workflows.

It does not play well with docker because there is not enough provisioning for the docker in docker communication.

The dags that I am currently developing will be located at the root of the project in the dags

The dags which are ready to commit and will go to production are stored at dags-production.

Here is an example structure

/
  dags/
    include/
      helpers/
        company_name/
          module_name/

  dags-production/
    team/
      dags/
        us-east-1/
          include/
            helpers/
          *.dag
        eu-central-1/
            include/
              helpers/
            *.dag
Enter fullscreen mode Exit fullscreen mode

Airflow writes every second in its home folder. That's why I am using a memory file system.

Install

Make a virtual environment

I will choose to use the local folder airflow/ to keep all the airflow libraries.

Airflow has a lot of dependencies and I don't want to pollute my regular python project with them.

Airflow, after all, is just a regular tool and should stay separate from our code.

If you don't have poetry installed do it now

curl -sSL <https://install.python-poetry.org> | python3 -
Enter fullscreen mode Exit fullscreen mode

Then let's prepare the airflow folder

mkdir airflow
echo "airflow" >> .gitignore
sudo mount -t tmpfs -o size=50m tmpfs ./airflow
poetry --directory ./airflow init --name=airflow --description=airflow --author=me --no-interaction
Enter fullscreen mode Exit fullscreen mode

Now it's time to install it.

Install in a virtual environment

We will use poetry to initialize a new virtual environment from the ./airflow folder.

poetry --directory ./airflow shell
cd ..
export PROJECT_HOME=$(realpath ./)
export AIRFLOW_HOME=${PROJECT_HOME}/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=./dags
Enter fullscreen mode Exit fullscreen mode

Now let's install Airflow using the constraints file

We will follow the steps from here https://airflow.apache.org/docs/apache-airflow/stable/start/local.html

AIRFLOW_VERSION=2.5.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

pip install apache-airflow-providers-docker apache-airflow-providers-amazon
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-redis
pip install apache-airflow-providers-mysql
Enter fullscreen mode Exit fullscreen mode

We will delete all the sample DAGs. We want to see our DAGs much quicker.

Airflow will also work faster because it won't parse a lot of DAGs.

We will look for something like '/home/USER/.cache/pypoetry/virtualenvs/airflow-4vTX1qLp-py3.9'

poetry env info

echo "Show what we will delete"
find /home/guda/.cache/pypoetry/virtualenvs | grep example_dags

echo "Do actual delete"
find /home/guda/.cache/pypoetry/virtualenvs | grep example_dags | xargs rm -rf  "{}"

echo "Or do it that way"
pip show pip
find /home/guda/.cache/pypoetry/virtualenvs/bookings-bQ2s_Hyz-py3.8/lib/python3.8/site-packages | grep example_dags
Enter fullscreen mode Exit fullscreen mode

In case you forgot to delete the sample dags, it will be quicker to delete the airflow database and start again.

rm airflow/airflow.db
Enter fullscreen mode Exit fullscreen mode

Finally, run it

airflow standalone
Enter fullscreen mode Exit fullscreen mode

Next time run with (the password is in standalone_admin_password.txt)

Run Airflow next time

You can always consult what is the default password by running

cat airflowstandalone_admin_password.txt
Enter fullscreen mode Exit fullscreen mode

We can do poetry shell in the ./airflow folder, and then run airflow standalone or other airflow commands.

But it is faster to prefix the airflow commands with poetry --directory ./airflow run

export PROJECT_HOME=$(realpath ./)
export AIRFLOW_HOME=${PROJECT_HOME}/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=./dags

poetry --directory ./airflow run airflow standalone
poetry --directory ./airflow run airflow ...
Enter fullscreen mode Exit fullscreen mode

At this point you will be able to access the empty airflow via http://127.0.0.1:8080/

Prepare for the Containers

The DockerOperator will need a registry to pull images from.
In development, it is easy to push the build images locally and ask airflow to pull them from the registry configured at the docker_default connection.
I have no idea why it is called docker_default instead of image_registry_default but this is another subject.

Let's Run a local registry.

Working with the Registry

First time Run

Pull and run the registry locally.

docker run -d -p 5000:5000 --name registry registry:2
Enter fullscreen mode Exit fullscreen mode

Next time Run

Next time you will already have the registry and you have to run

docker start registry
Enter fullscreen mode Exit fullscreen mode

Working with images

Build the image

When you build an image, tag it, and push it.

docker-compose build custom_image
Enter fullscreen mode Exit fullscreen mode

or

docker build .
Enter fullscreen mode Exit fullscreen mode

Push & Pull

docker image tag custom_image localhost:5000/custom_image
docker push localhost:5000/custom_image
docker pull localhost:5000/custom_image
Enter fullscreen mode Exit fullscreen mode

Hints & Tips to debug the local registry

You need to specify a password in airflow - test the connection command line.
The password could be fake

docker login 127.0.0.1:5000
Enter fullscreen mode Exit fullscreen mode

When you specify the image don't forget to set the host to something like this:

    127.0.0.1:5000/custom_image
Enter fullscreen mode Exit fullscreen mode

List the images

curl -X GET http://127.0.0.1:5000/v2/_catalog
Enter fullscreen mode Exit fullscreen mode

Stop registry

docker container stop registry \
  && docker container rm -v registry
Enter fullscreen mode Exit fullscreen mode

Working with containers

When working with containers you will probably have an entry point script. The entry point will provide you with an interface and will allow you to run only a specific set of commands. To override the entry point of the image, do it like this:

docker run -u root --entrypoint /bin/bash -ti custom_image:latest
Enter fullscreen mode Exit fullscreen mode

Seed airflow settings

The seed is great to put in justfile airflow-seed task

Seeding users

poetry --directory ./airflow run airflow users delete --username admin
poetry --directory ./airflow run airflow users create --role Admin --username admin --email admin@example.com --firstname admin --lastname admin --password admin
Enter fullscreen mode Exit fullscreen mode

Seeding connections

You are using direnv, right?

poetry --directory ./airflow run \
  airflow connections add 'docker_default' \
    --conn-type 'docker' \
    --conn-login 'root' \
    --conn-host '127.0.0.1' \
    --conn-port '5000'

poetry --directory ./airflow run \
  airflow connections add 'snowflake' \
    --conn-type 'generic' \
    --conn-login '$SNOWFLAKE_USER' \
    --conn-host '$SNOWFLAKE_ACCOUNT' \
    --conn-password '$SNOWFLAKE_PASSWORD'

poetry --directory ./airflow run \
  airflow connections add 'aws' \
    --conn-type 'aws'
Enter fullscreen mode Exit fullscreen mode

Seeding variables

poetry --directory ./airflow run \
  airflow variables set aws_region_name us-east-1

poetry --directory ./airflow run \
  airflow variables set current_aws_account NNNNNNNNNNNNNNNN

poetry --directory ./airflow run \
  airflow variables set environment production

poetry --directory ./airflow run \
  airflow variables set airflow_base_url http://localhost:8080/
Enter fullscreen mode Exit fullscreen mode

Dag

Now finally you will be able to do something like that

from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from datetime import datetime, timedelta

# Default arguments for the DAG
default_args = {
    'owner': 'me',
    'start_date': datetime(2022, 1, 1),
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create the DAG
with DAG(
    'hello_world_dag',
    default_args=default_args,
    schedule_interval=timedelta(hours=1),
    catchup=False,
) as dag:
    # Create a task using the DockerOperator
    hello_world_task = DockerOperator(
        task_id='hello_world_task',
        image='localhost:5000/myimage:latest',
        api_version='auto',
        command='echo "hello world"',
        docker_conn_id='local_docker_registry',
    )
Enter fullscreen mode Exit fullscreen mode

Top comments (0)