Airflow tooling and code structure
The airflow should be easy to work with and develop.
The new astro-cli tool from astronomers is intended to help users work with their cloud and it doesn't cover all the development workflows.
It does not play well with docker because there is not enough provisioning for the docker in docker communication.
The dags that I am currently developing will be located at the root of the project in the dags
The dags which are ready to commit and will go to production are stored at dags-production.
Here is an example structure
/
dags/
include/
helpers/
company_name/
module_name/
dags-production/
team/
dags/
us-east-1/
include/
helpers/
*.dag
eu-central-1/
include/
helpers/
*.dag
Airflow writes every second in its home folder. That's why I am using a memory file system.
Install
Make a virtual environment
I will choose to use the local folder airflow/ to keep all the airflow libraries.
Airflow has a lot of dependencies and I don't want to pollute my regular python project with them.
Airflow, after all, is just a regular tool and should stay separate from our code.
If you don't have poetry installed do it now
curl -sSL <https://install.python-poetry.org> | python3 -
Then let's prepare the airflow folder
mkdir airflow
echo "airflow" >> .gitignore
sudo mount -t tmpfs -o size=50m tmpfs ./airflow
poetry --directory ./airflow init --name=airflow --description=airflow --author=me --no-interaction
Now it's time to install it.
Install in a virtual environment
We will use poetry to initialize a new virtual environment from the ./airflow folder.
poetry --directory ./airflow shell
cd ..
export PROJECT_HOME=$(realpath ./)
export AIRFLOW_HOME=${PROJECT_HOME}/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=./dags
Now let's install Airflow using the constraints file
We will follow the steps from here https://airflow.apache.org/docs/apache-airflow/stable/start/local.html
AIRFLOW_VERSION=2.5.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install apache-airflow-providers-docker apache-airflow-providers-amazon
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-redis
pip install apache-airflow-providers-mysql
We will delete all the sample DAGs. We want to see our DAGs much quicker.
Airflow will also work faster because it won't parse a lot of DAGs.
We will look for something like '/home/USER/.cache/pypoetry/virtualenvs/airflow-4vTX1qLp-py3.9'
poetry env info
echo "Show what we will delete"
find /home/guda/.cache/pypoetry/virtualenvs | grep example_dags
echo "Do actual delete"
find /home/guda/.cache/pypoetry/virtualenvs | grep example_dags | xargs rm -rf "{}"
echo "Or do it that way"
pip show pip
find /home/guda/.cache/pypoetry/virtualenvs/bookings-bQ2s_Hyz-py3.8/lib/python3.8/site-packages | grep example_dags
In case you forgot to delete the sample dags, it will be quicker to delete the airflow database and start again.
rm airflow/airflow.db
Finally, run it
airflow standalone
Next time run with (the password is in standalone_admin_password.txt)
Run Airflow next time
You can always consult what is the default password by running
cat airflowstandalone_admin_password.txt
We can do poetry shell in the ./airflow folder, and then run airflow standalone or other airflow commands.
But it is faster to prefix the airflow commands with poetry --directory ./airflow run
export PROJECT_HOME=$(realpath ./)
export AIRFLOW_HOME=${PROJECT_HOME}/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=./dags
poetry --directory ./airflow run airflow standalone
poetry --directory ./airflow run airflow ...
At this point you will be able to access the empty airflow via http://127.0.0.1:8080/
Prepare for the Containers
The DockerOperator will need a registry to pull images from.
In development, it is easy to push the build images locally and ask airflow to pull them from the registry configured at the docker_default connection.
I have no idea why it is called docker_default instead of image_registry_default but this is another subject.
Let's Run a local registry.
Working with the Registry
First time Run
Pull and run the registry locally.
docker run -d -p 5000:5000 --name registry registry:2
Next time Run
Next time you will already have the registry and you have to run
docker start registry
Working with images
Build the image
When you build an image, tag it, and push it.
docker-compose build custom_image
or
docker build .
Push & Pull
docker image tag custom_image localhost:5000/custom_image
docker push localhost:5000/custom_image
docker pull localhost:5000/custom_image
Hints & Tips to debug the local registry
You need to specify a password in airflow - test the connection command line.
The password could be fake
docker login 127.0.0.1:5000
When you specify the image don't forget to set the host to something like this:
127.0.0.1:5000/custom_image
List the images
curl -X GET http://127.0.0.1:5000/v2/_catalog
Stop registry
docker container stop registry \
&& docker container rm -v registry
Working with containers
When working with containers you will probably have an entry point script. The entry point will provide you with an interface and will allow you to run only a specific set of commands. To override the entry point of the image, do it like this:
docker run -u root --entrypoint /bin/bash -ti custom_image:latest
Seed airflow settings
The seed is great to put in justfile airflow-seed task
Seeding users
poetry --directory ./airflow run airflow users delete --username admin
poetry --directory ./airflow run airflow users create --role Admin --username admin --email admin@example.com --firstname admin --lastname admin --password admin
Seeding connections
You are using direnv, right?
poetry --directory ./airflow run \
airflow connections add 'docker_default' \
--conn-type 'docker' \
--conn-login 'root' \
--conn-host '127.0.0.1' \
--conn-port '5000'
poetry --directory ./airflow run \
airflow connections add 'snowflake' \
--conn-type 'generic' \
--conn-login '$SNOWFLAKE_USER' \
--conn-host '$SNOWFLAKE_ACCOUNT' \
--conn-password '$SNOWFLAKE_PASSWORD'
poetry --directory ./airflow run \
airflow connections add 'aws' \
--conn-type 'aws'
Seeding variables
poetry --directory ./airflow run \
airflow variables set aws_region_name us-east-1
poetry --directory ./airflow run \
airflow variables set current_aws_account NNNNNNNNNNNNNNNN
poetry --directory ./airflow run \
airflow variables set environment production
poetry --directory ./airflow run \
airflow variables set airflow_base_url http://localhost:8080/
Dag
Now finally you will be able to do something like that
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from datetime import datetime, timedelta
# Default arguments for the DAG
default_args = {
'owner': 'me',
'start_date': datetime(2022, 1, 1),
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Create the DAG
with DAG(
'hello_world_dag',
default_args=default_args,
schedule_interval=timedelta(hours=1),
catchup=False,
) as dag:
# Create a task using the DockerOperator
hello_world_task = DockerOperator(
task_id='hello_world_task',
image='localhost:5000/myimage:latest',
api_version='auto',
command='echo "hello world"',
docker_conn_id='local_docker_registry',
)
Top comments (0)