DEV Community

Cover image for Apache Airflow 初體驗
HyperRedStart
HyperRedStart

Posted on

Apache Airflow 初體驗

Airflow 是由 Airbnb 所貢獻的 Apache 頂級專案,用於流程控制,建立有向無環的工作流程(DAGs) ,以事件排程的方式部屬我們的 DAG Schedule 並可以部屬到各個雲端平台上, GCP / AWS / Azure,可透過網頁 UI 調配部屬排程,主打彈性、高擴充性、搭配Jinja Template Engine 優雅的設計 Workflow 工作流程!

1.建置 Airflow 環境 - 使用 Docker

下載官方提供 docker-compose 文件

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml'
Enter fullscreen mode Exit fullscreen mode

產生相依目錄並設定 Airflow 使用者權限

mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
Enter fullscreen mode Exit fullscreen mode

2.資料庫初始化

docker-compose up airflow-init
Enter fullscreen mode Exit fullscreen mode

3.啟動 Airflow 服務

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

airflow service

確認我們的 Container 是否都健康!
dockerps

4.下載 airflow 命令工具

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.1/airflow.sh'
chmod +x airflow.sh
Enter fullscreen mode Exit fullscreen mode

5.撰寫 Demo 程式

將我們第一隻 DAGs 程式放至於 Container掛載目錄 dags 下
dags/test_app_v1.py

import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
  'owner' : '',
  'start_date' : datetime(2022, 1, 1, 0, 0),
  'schedule_interval': '@daily',
  'retries':2,
  'retry_delay': timedelta(minutes = 1)

}


def fn_one(): 
  print('execute jobs')

with DAG('test_app_v1' ,default_args=default_args) as dag:
  tesk = PythonOperator(
    task_id = 'one',
    python_callable=fn_one
  )
Enter fullscreen mode Exit fullscreen mode

6.透過 airflow.sh 將執行我們的 DAG 程式

# 建立運行 python container 
./airflow.sh bash
# 執行 python 檔案
python dags/test_app_v1.py
Enter fullscreen mode Exit fullscreen mode

7.網頁中查看新建立的 test_app_v1 DAG

Airflow Web Server
http://localhost:8080
airflow/airflow

在執行py腳本後我們就可以在 Airflow 首頁中查找到我們的 DAG 排程 !

dag

查看執行狀態,使否有異常失敗等情況
status

8.REST API

使用 Airflow Rest Api 可以取代我們在 WebUI 上的工作,進行 DAG 的抓取執行刪除等工作。

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

curl -X GET 'http://10.1.200.67:8080/api/v1/dags' --user "airflow:airflow"
Enter fullscreen mode Exit fullscreen mode
{
  "dag_id": "test_app_v1",
  "description": null,
  "file_token": "",
  "fileloc": "/opt/airflow/dags/test_app_v1.py",
  "is_active": true,
  "is_paused": false,
  "is_subdag": false,
  "owners": [],
  "root_dag_id": null,
  "schedule_interval": {
    "__type": "TimeDelta",
    "days": 1,
    "microseconds": 0,
    "seconds": 0
  },
  "tags": []
}

Enter fullscreen mode Exit fullscreen mode

Conclusion

很多需要進行排定的工作項目都可以在 Airflow 的幫助下進行運行,搭配 python的語法可以很快速的撰寫爬蟲、資料轉換、事件觸發等工作,Airflow 提供WebUI操作畫面可以讓我們更便利的去對我們的 DAGs 進行監控,解決了我們在設計排程上的諸多困難!

Discussion (0)