Showing posts with label Data Engineering. Show all posts
Showing posts with label Data Engineering. Show all posts

Prevent Pipeline Crashes: Real-Time Data Validation with Pydantic



Stop Broken Pipelines: Real-Time Data Validation with Pydantic

In modern data engineering, "Garbage In, Garbage Out" is no longer just a saying—it's a financial risk. If your Python ETL script expects a price as a float but receives a null or a string, your pipeline crashes, and your downstream stakeholders lose trust.

The solution? Contract-driven development using Pydantic.

1. What is Pydantic?

Pydantic is a data validation library for Python that enforces type hints at runtime. Instead of writing 50 if/else statements to check your data, you define a Schema (a Class), and Pydantic does the heavy lifting.

2. The Problem: The "Silent Fail"

Look at this standard dictionary from an API. If price is missing or id is a string instead of an int, your SQL database might reject it.

raw_data = {"id": "101", "name": "Sensor_A", "price": "None"}
# This will break your DB!

3. The Solution: Defining a Data Contract

With Pydantic, we create a "Gatekeeper" for our data.

from pydantic import BaseModel, field_validator
from typing import Optional

class UserData(BaseModel):
id: int
name: str
price: float
status: Optional[str] = "active"

# We can even add custom logic!
@field_validator('price')
def price_must_be_positive(cls, v):
if v < 0:
raise ValueError('Price cannot be negative')
return v

# Now, let's validate the "dirty" data
try:
clean_data = UserData(**raw_data)
print(clean_data.model_dump())
except Exception as e:
print(f"Data Validation Failed: {e}")

4. Integrating with Airflow or Kafka

In 2026, the best practice is to put this validation at the very start of your pipeline (the Ingestion layer).

  • If validation fails: Route the "dirty" data to a Dead Letter Queue (DLQ) for manual review.

  • If validation passes: Load the data into your Warehouse (Snowflake/BigQuery).

Build a Self-Healing Airflow Pipeline: Using AI Agents to Auto-Fix Errors



Traditional Airflow DAGs are "brittle." If the source data changes from a comma (,) to a pipe (|) delimiter, the task fails, the pipeline stops, and you have to fix it manually.

In this guide, we’ll build a "Try-Heal-Retry" loop. We will use a Python agent that intercepts failures, asks an LLM (like GPT-4o or Claude 3.5) for a fix, and automatically retries the task with the new logic.

1. The Architecture: The "Healer" Loop

Instead of a standard PythonOperator, we use a custom logic where the "Retry" phase is actually an "AI Repair" phase.

2. The Secret Sauce: The on_failure_callback

Airflow allows you to run a function whenever a task fails. This is where our AI Agent lives.

The Agent Logic:

  1. Capture: Grab the last 50 lines of the task log and the failing code.

  2. Consult: Send that "context" to an LLM with a strict prompt: "Find the error and return only the corrected Python parameters."

  3. Execute: Update the Airflow Variable and trigger a retry.

3. Step-by-Step Implementation

Step A: The "Healer" Function

This function acts as your 24/7 on-call engineer.

import openai
from airflow.models import Variable

def ai_healer_agent(context):
task_instance = context['ti']
error_log = task_instance.xcom_pull(task_ids=task_instance.task_id,
key='error_msg')
prompt = f"The following Airflow task failed: {error_log}. Suggest a
fix in JSON format."
# AI identifies if it's a schema change, connection issue, or syntax error
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
# Store the 'fix' in an Airflow Variable for the next retry
Variable.set("last_ai_fix", response.choices[0].message.content)

Step B: The Self-Healing DAG

We use the tenacity library or Airflow's native retries to loop back after the agent suggests a fix.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def my_data_task(**kwargs):
# Check if the AI Agent left a 'fix' for us
fix = Variable.get("last_ai_fix", default_var=None)
# ... use the fix to run the code (e.g., change the delimiter) ...
raise ValueError("Delimiter mismatch detected!") # Example failure

with DAG('self_healing_pipeline', start_date=datetime(2026, 1, 1),
schedule='@daily') as dag:
run_etl = PythonOperator(
task_id='run_etl',
python_callable=my_data_task,
on_failure_callback=ai_healer_agent, # The Agent kicks in here!
retries=1
)

4. Why this is the "Future" 

  • MTTR (Mean Time To Recovery): You reduce your recovery time from hours to seconds.

  • Cost: You only pay for the LLM API call when a failure actually happens.

  • Human-in-the-loop: You can set the agent to "Suggest" a fix via Slack for you to approve with one click, rather than fully auto-fixing.


How to Hide API Keys in Python: Stop Leaking Secrets to GitHub


 

Whether you are building a data pipeline in Airflow or a simple AI bot, you should never hard-code your API keys directly in your Python script. If you push that code to a public repository, hackers will find it in seconds using automated scanners.

Here is the professional way to handle secrets using Environment Variables and .env files.

1. The Tool: python-dotenv

The industry standard for managing local secrets is a library called python-dotenv. It allows you to store your keys in a separate file that never gets uploaded to the internet.

Install it via terminal:

pip install python-dotenv

2. Create your .env File

In your project’s root folder, create a new file named exactly .env. Inside, add your secrets like this:

# .env file
DATABASE_URL=postgres://user:password@localhost:5432/mydb
OPENAI_API_KEY=sk-your-secret-key-here
AWS_SECRET_ACCESS_KEY=your-aws-key

3. Access Secrets in Python

Now, you can load these variables into your script without ever typing the actual key in your code.

import os
from dotenv import load_dotenv

# Load the variables from .env into the system environment
load_dotenv()

# Access them using os.getenv
api_key = os.getenv("OPENAI_API_KEY")
db_url = os.getenv("DATABASE_URL")

print(f"Successfully connected to the database!")

4. The Most Important Step: .gitignore

This is where the "Security" part happens. You must tell Git to ignore your .env file so it never leaves your computer.

Create a file named .gitignore and add this line:

.env

Why this is a "DevSecOps" Win:

  • Security: Your keys stay on your machine.

  • Flexibility: You can use different keys for "Development" and "Production" without changing a single line of code.

  • Collaboration: Your teammates can create their own local .env files with their own credentials.

Why is my Airflow Task stuck in "Queued" state? (5 Quick Fixes)


 

You’ve triggered your DAG, the UI shows the task as grey (Queued), but nothing happens for minutes—or hours. This is a classic Airflow bottleneck. Here is how to diagnose and fix it.

1. Check the "Concurrency" Limits

Airflow has several "safety brakes" to prevent your server from crashing. If you hit these limits, tasks will stay queued until others finish.

  • parallelism: The max number of task instances that can run across your entire Airflow environment.

  • dag_concurrency: The max number of tasks that can run for a single DAG.

  • max_active_runs_per_dag: If you have too many "Backfills" running, new tasks won't start.

The Fix: Check your airflow.cfg or your DAG definition. Increase max_active_tasks if your hardware can handle it.

2. Is the Scheduler actually alive?

Sometimes the Airflow UI looks fine, but the Scheduler process has died or hung.

  • Check the UI: Look at the top of the Airflow page. If there is a red banner saying "The scheduler does not appear to be running," that’s your answer.

  • The Fix: Restart the scheduler service:

  • systemctl restart airflow-scheduler
    # OR if using Docker:
    docker restart airflow-scheduler

3. "No Slots Available" in Pools

Airflow uses Pools to manage resources (like limiting how many tasks can hit a specific database at once). If your task belongs to a pool with 5 slots and 5 tasks are already running, your 6th task will stay Queued forever.

The Fix: Go to Admin -> Pools in the UI. Check if the "Default Pool" or your custom pool is full. Increase the slots if necessary.

4. Celery Worker Issues (For Production Setups)

If you are using the CeleryExecutor, the task is queued in Redis or RabbitMQ, but the Worker might not be picking it up.

  • The Check: Run airflow celery inspect_short to see if workers are online.

  • The Fix: Ensure your workers are pointed to the same Metadata DB and Broker as your Scheduler.

5. Resource Starvation (OOM)

If your worker node is out of RAM or CPU, it might accept the task but fail to initialize it, leading to a loop where the task stays queued.

Terraform for Data Engineers: How to Automate Your Database Setup

  Stop Manual Setup: Deploy a PostgreSQL Database with Terraform If you are still manually creating databases in the AWS or Azure console, y...