DEV Community

Developer213
Developer213

Posted on

Trigger airflow failed action


# Install necessary libraries if not already installed
!pip install requests

# Import libraries
import requests
import json

# Set up your Astronomer API URL, DAG ID, and token key
BASE_URL = "https://<your-astronomer-airflow-instance>/api/v1"
DAG_ID = "your_dag_id"  # Replace with your DAG ID
TOKEN = "your_token_key"  # Replace with your actual API token key

# Configure the authorization headers
headers = {
    "Authorization": f"Bearer {TOKEN}",
    "Content-Type": "application/json",
}

# Step 1: Retrieve the latest DAG run
dag_run_url = f"{BASE_URL}/dags/{DAG_ID}/dagRuns"
response = requests.get(dag_run_url, headers=headers)

if response.status_code == 200:
    # Retrieve the most recent DAG run ID
    dag_runs = response.json()
    DAG_RUN_ID = dag_runs["dag_runs"][0]["dag_run_id"]
    print("Latest DAG Run ID:", DAG_RUN_ID)
else:
    print("Failed to retrieve DAG runs:", response.text)
    DAG_RUN_ID = None

# Step 2: Trigger a rerun for the failed task if a valid DAG run ID was retrieved
if DAG_RUN_ID:
    TASK_ID = "your_failed_task_id"  # Replace with the specific failed task ID

    task_url = f"{BASE_URL}/dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}/taskInstances/{TASK_ID}/clear"
    data = {
        "only_failed": True,  # Clears only failed tasks
        "reset_dag_runs": False,  # Keeps other tasks intact
        "include_subdags": False,
        "include_parentdag": False
    }

    # Send POST request to rerun the failed task
    response = requests.post(task_url, headers=headers, data=json.dumps(data))

    if response.status_code == 200:
        print("Task rerun triggered successfully.")
    else:
        print("Failed to rerun task:", response.text)


Enter fullscreen mode Exit fullscreen mode

Top comments (0)