Showing posts with label Airflow. Show all posts
Showing posts with label Airflow. Show all posts

Build a Self-Healing Airflow Pipeline: Using AI Agents to Auto-Fix Errors



Traditional Airflow DAGs are "brittle." If the source data changes from a comma (,) to a pipe (|) delimiter, the task fails, the pipeline stops, and you have to fix it manually.

In this guide, we’ll build a "Try-Heal-Retry" loop. We will use a Python agent that intercepts failures, asks an LLM (like GPT-4o or Claude 3.5) for a fix, and automatically retries the task with the new logic.

1. The Architecture: The "Healer" Loop

Instead of a standard PythonOperator, we use a custom logic where the "Retry" phase is actually an "AI Repair" phase.

2. The Secret Sauce: The on_failure_callback

Airflow allows you to run a function whenever a task fails. This is where our AI Agent lives.

The Agent Logic:

  1. Capture: Grab the last 50 lines of the task log and the failing code.

  2. Consult: Send that "context" to an LLM with a strict prompt: "Find the error and return only the corrected Python parameters."

  3. Execute: Update the Airflow Variable and trigger a retry.

3. Step-by-Step Implementation

Step A: The "Healer" Function

This function acts as your 24/7 on-call engineer.

import openai
from airflow.models import Variable

def ai_healer_agent(context):
task_instance = context['ti']
error_log = task_instance.xcom_pull(task_ids=task_instance.task_id,
key='error_msg')
prompt = f"The following Airflow task failed: {error_log}. Suggest a
fix in JSON format."
# AI identifies if it's a schema change, connection issue, or syntax error
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
# Store the 'fix' in an Airflow Variable for the next retry
Variable.set("last_ai_fix", response.choices[0].message.content)

Step B: The Self-Healing DAG

We use the tenacity library or Airflow's native retries to loop back after the agent suggests a fix.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def my_data_task(**kwargs):
# Check if the AI Agent left a 'fix' for us
fix = Variable.get("last_ai_fix", default_var=None)
# ... use the fix to run the code (e.g., change the delimiter) ...
raise ValueError("Delimiter mismatch detected!") # Example failure

with DAG('self_healing_pipeline', start_date=datetime(2026, 1, 1),
schedule='@daily') as dag:
run_etl = PythonOperator(
task_id='run_etl',
python_callable=my_data_task,
on_failure_callback=ai_healer_agent, # The Agent kicks in here!
retries=1
)

4. Why this is the "Future" 

  • MTTR (Mean Time To Recovery): You reduce your recovery time from hours to seconds.

  • Cost: You only pay for the LLM API call when a failure actually happens.

  • Human-in-the-loop: You can set the agent to "Suggest" a fix via Slack for you to approve with one click, rather than fully auto-fixing.


What is Apache Airflow

 Airflow?


Apache airflow is an open source platform for developing, scheduling and monitoring batch oriented workflows. Airflow's extensible python framework allows you to write code and create dag for workflows.

What is Dag?


A dag is called as directed acyclic graph means while creating or designing a dag, Cycle should not be formed. Here for example if I execute a task A and task A executes task B, task B should execute task C. Task C should not be calling task A otherwise there will be a cycle formed and In math you can relate it to transitive dependency. So there shouldn't be a transitive dependency.

Dag represents a workflow which is a collection of tasks. Below is how a dag looks like:




Airflow has a nice GUI by which you can manage creating variables and other admin stuff. You can also view your dag on GUI , trigger it or cancel it. Everything you can manage via GUI.

Use Case for Airflow


Suppose you have a unix job that  processes a text file at specific time and then it calls other job which does it's part. But sometimes you missed the file and your first job would still run and fail and your next dependent job will also run and process no data. So to overcome this scenario we can use Airflow and create a DAG where all tasks will be dependent on each other. So when we get a file then only our first job will be triggered and once first job will be succeeded our next job will run.

Let's write your first dag


from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:

# Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")

@task()
def airflow():
print("airflow")

# Set dependencies between tasks
hello >> airflow()

In above code you need to import the DAG from airflow and BashOperator.

When you have a requirement to execute linux commands and you want to create a workflow use BashOperator. Similarly when you have a requirement to write a python function and execute it then use pythonOperator. In a summary BashOperator is used to execute bash commands and pythonOperator is used to execute the python commands.

Now once you have imported everything required the next line would be to define your dag.
When Airflow scheduler runs, it looks for a line which has "with DAG" in it. When it finds it then it creates a dag based on the config you passed.

After that there is BashOperator used where we creating a task with the name "hello" and executing command using BashOperator. You need to give a unique task id to each task keep it remember. And on the last line you can plan your flow like which task has to run first and last. 

Once you save the file the Airflow scheduler will automatically pick the new file and create the DAG. If any error occurs it will be displayed on the top of the Airflow UI.




Conclusion


Airflow is an awesome framework to use. It can be very useful to orchestrate all legacy unix jobs and flows. We just to choose airflow for right use case and it can solve those problems easily.


Terraform for Data Engineers: How to Automate Your Database Setup

  Stop Manual Setup: Deploy a PostgreSQL Database with Terraform If you are still manually creating databases in the AWS or Azure console, y...