Showing posts with label python. Show all posts
Showing posts with label python. Show all posts

Prevent Pipeline Crashes: Real-Time Data Validation with Pydantic



Stop Broken Pipelines: Real-Time Data Validation with Pydantic

In modern data engineering, "Garbage In, Garbage Out" is no longer just a saying—it's a financial risk. If your Python ETL script expects a price as a float but receives a null or a string, your pipeline crashes, and your downstream stakeholders lose trust.

The solution? Contract-driven development using Pydantic.

1. What is Pydantic?

Pydantic is a data validation library for Python that enforces type hints at runtime. Instead of writing 50 if/else statements to check your data, you define a Schema (a Class), and Pydantic does the heavy lifting.

2. The Problem: The "Silent Fail"

Look at this standard dictionary from an API. If price is missing or id is a string instead of an int, your SQL database might reject it.

raw_data = {"id": "101", "name": "Sensor_A", "price": "None"}
# This will break your DB!

3. The Solution: Defining a Data Contract

With Pydantic, we create a "Gatekeeper" for our data.

from pydantic import BaseModel, field_validator
from typing import Optional

class UserData(BaseModel):
id: int
name: str
price: float
status: Optional[str] = "active"

# We can even add custom logic!
@field_validator('price')
def price_must_be_positive(cls, v):
if v < 0:
raise ValueError('Price cannot be negative')
return v

# Now, let's validate the "dirty" data
try:
clean_data = UserData(**raw_data)
print(clean_data.model_dump())
except Exception as e:
print(f"Data Validation Failed: {e}")

4. Integrating with Airflow or Kafka

In 2026, the best practice is to put this validation at the very start of your pipeline (the Ingestion layer).

  • If validation fails: Route the "dirty" data to a Dead Letter Queue (DLQ) for manual review.

  • If validation passes: Load the data into your Warehouse (Snowflake/BigQuery).

Build a Self-Healing Airflow Pipeline: Using AI Agents to Auto-Fix Errors



Traditional Airflow DAGs are "brittle." If the source data changes from a comma (,) to a pipe (|) delimiter, the task fails, the pipeline stops, and you have to fix it manually.

In this guide, we’ll build a "Try-Heal-Retry" loop. We will use a Python agent that intercepts failures, asks an LLM (like GPT-4o or Claude 3.5) for a fix, and automatically retries the task with the new logic.

1. The Architecture: The "Healer" Loop

Instead of a standard PythonOperator, we use a custom logic where the "Retry" phase is actually an "AI Repair" phase.

2. The Secret Sauce: The on_failure_callback

Airflow allows you to run a function whenever a task fails. This is where our AI Agent lives.

The Agent Logic:

  1. Capture: Grab the last 50 lines of the task log and the failing code.

  2. Consult: Send that "context" to an LLM with a strict prompt: "Find the error and return only the corrected Python parameters."

  3. Execute: Update the Airflow Variable and trigger a retry.

3. Step-by-Step Implementation

Step A: The "Healer" Function

This function acts as your 24/7 on-call engineer.

import openai
from airflow.models import Variable

def ai_healer_agent(context):
task_instance = context['ti']
error_log = task_instance.xcom_pull(task_ids=task_instance.task_id,
key='error_msg')
prompt = f"The following Airflow task failed: {error_log}. Suggest a
fix in JSON format."
# AI identifies if it's a schema change, connection issue, or syntax error
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
# Store the 'fix' in an Airflow Variable for the next retry
Variable.set("last_ai_fix", response.choices[0].message.content)

Step B: The Self-Healing DAG

We use the tenacity library or Airflow's native retries to loop back after the agent suggests a fix.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def my_data_task(**kwargs):
# Check if the AI Agent left a 'fix' for us
fix = Variable.get("last_ai_fix", default_var=None)
# ... use the fix to run the code (e.g., change the delimiter) ...
raise ValueError("Delimiter mismatch detected!") # Example failure

with DAG('self_healing_pipeline', start_date=datetime(2026, 1, 1),
schedule='@daily') as dag:
run_etl = PythonOperator(
task_id='run_etl',
python_callable=my_data_task,
on_failure_callback=ai_healer_agent, # The Agent kicks in here!
retries=1
)

4. Why this is the "Future" 

  • MTTR (Mean Time To Recovery): You reduce your recovery time from hours to seconds.

  • Cost: You only pay for the LLM API call when a failure actually happens.

  • Human-in-the-loop: You can set the agent to "Suggest" a fix via Slack for you to approve with one click, rather than fully auto-fixing.


Reduce AWS Bills by 60%: Automate EC2 Stop/Start with Python & Lambda



In 2026, cloud bills have become a top expense for most tech companies. One of the biggest "money-wasters" is leaving Development and Staging servers running over the weekend or at night when no one is using them.

If you have a t3.large instance running 24/7, you're paying for 168 hours a week. By shutting it down outside of 9-to-5 working hours, you can save over 65% on your monthly bill.

The Solution: "The DevSecOps Auto-Stop"

We will use a small Python script running on AWS Lambda that looks for a specific tag (like Schedule: OfficeHours) and shuts down any instance that shouldn't be running.

Step 1: Tag Your Instances

First, go to your AWS Console and add a tag to the instances you want to automate:

  • Key: AutoStop

  • Value: True

Step 2: The Python Script (Boto3)

Create a new AWS Lambda function and paste this code. It uses the boto3 library to talk to your EC2 instances.

import boto3

ec2 = boto3.client('ec2', region_name='us-east-1') # Change to your region

def lambda_handler(event, context):
# Search for all running instances with the 'AutoStop' tag
filters = [
{'Name': 'tag:AutoStop', 'Values': ['True']},
{'Name': 'instance-state-name', 'Values': ['running']}
]
instances = ec2.describe_instances(Filters=filters)
instance_ids = []

for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_ids.append(instance['InstanceId'])

if instance_ids:
ec2.stop_instances(InstanceIds=instance_ids)
print(f"Successfully stopped instances: {instance_ids}")
else:
print("No running instances found with AutoStop=True tag.")


Step 3: Set the Schedule (EventBridge)

You don't want to run this manually.

  1. Go to Amazon EventBridge.

  2. Create a "Schedule."

  3. Use a Cron expression to trigger the Lambda at 7:00 PM every evening:

    • cron(0 19 ? * MON-FRI *)

How to Hide API Keys in Python: Stop Leaking Secrets to GitHub


 

Whether you are building a data pipeline in Airflow or a simple AI bot, you should never hard-code your API keys directly in your Python script. If you push that code to a public repository, hackers will find it in seconds using automated scanners.

Here is the professional way to handle secrets using Environment Variables and .env files.

1. The Tool: python-dotenv

The industry standard for managing local secrets is a library called python-dotenv. It allows you to store your keys in a separate file that never gets uploaded to the internet.

Install it via terminal:

pip install python-dotenv

2. Create your .env File

In your project’s root folder, create a new file named exactly .env. Inside, add your secrets like this:

# .env file
DATABASE_URL=postgres://user:password@localhost:5432/mydb
OPENAI_API_KEY=sk-your-secret-key-here
AWS_SECRET_ACCESS_KEY=your-aws-key

3. Access Secrets in Python

Now, you can load these variables into your script without ever typing the actual key in your code.

import os
from dotenv import load_dotenv

# Load the variables from .env into the system environment
load_dotenv()

# Access them using os.getenv
api_key = os.getenv("OPENAI_API_KEY")
db_url = os.getenv("DATABASE_URL")

print(f"Successfully connected to the database!")

4. The Most Important Step: .gitignore

This is where the "Security" part happens. You must tell Git to ignore your .env file so it never leaves your computer.

Create a file named .gitignore and add this line:

.env

Why this is a "DevSecOps" Win:

  • Security: Your keys stay on your machine.

  • Flexibility: You can use different keys for "Development" and "Production" without changing a single line of code.

  • Collaboration: Your teammates can create their own local .env files with their own credentials.

Why is my Airflow Task stuck in "Queued" state? (5 Quick Fixes)


 

You’ve triggered your DAG, the UI shows the task as grey (Queued), but nothing happens for minutes—or hours. This is a classic Airflow bottleneck. Here is how to diagnose and fix it.

1. Check the "Concurrency" Limits

Airflow has several "safety brakes" to prevent your server from crashing. If you hit these limits, tasks will stay queued until others finish.

  • parallelism: The max number of task instances that can run across your entire Airflow environment.

  • dag_concurrency: The max number of tasks that can run for a single DAG.

  • max_active_runs_per_dag: If you have too many "Backfills" running, new tasks won't start.

The Fix: Check your airflow.cfg or your DAG definition. Increase max_active_tasks if your hardware can handle it.

2. Is the Scheduler actually alive?

Sometimes the Airflow UI looks fine, but the Scheduler process has died or hung.

  • Check the UI: Look at the top of the Airflow page. If there is a red banner saying "The scheduler does not appear to be running," that’s your answer.

  • The Fix: Restart the scheduler service:

  • systemctl restart airflow-scheduler
    # OR if using Docker:
    docker restart airflow-scheduler

3. "No Slots Available" in Pools

Airflow uses Pools to manage resources (like limiting how many tasks can hit a specific database at once). If your task belongs to a pool with 5 slots and 5 tasks are already running, your 6th task will stay Queued forever.

The Fix: Go to Admin -> Pools in the UI. Check if the "Default Pool" or your custom pool is full. Increase the slots if necessary.

4. Celery Worker Issues (For Production Setups)

If you are using the CeleryExecutor, the task is queued in Redis or RabbitMQ, but the Worker might not be picking it up.

  • The Check: Run airflow celery inspect_short to see if workers are online.

  • The Fix: Ensure your workers are pointed to the same Metadata DB and Broker as your Scheduler.

5. Resource Starvation (OOM)

If your worker node is out of RAM or CPU, it might accept the task but fail to initialize it, leading to a loop where the task stays queued.

How to Create Your First AI Bot in Python: A Step-by-Step Guide

How to Create Your First AI Bot in Python



Artificial Intelligence (AI) bots are transforming industries, automating tasks, and creating smarter solutions. Building an AI bot in Python is an excellent project for beginners and professionals alike. Python’s simplicity and vast libraries make it the go-to language for AI development. In this guide, we’ll walk you through the process of creating a simple chatbot using Python.


Why Python for AI Bots?

Python offers a rich ecosystem of libraries like NLTK, spaCy, and TensorFlow that simplify natural language processing (NLP) and AI development. Its ease of use and community support make it ideal for building AI bots.


Step-by-Step: Create Your First AI Bot

Step 1: Install Python and Required Libraries

To begin, ensure you have Python installed on your system. You can download it from the official Python website. Then, install essential libraries using pip:

pip install nltk
pip install chatterbot
pip install chatterbot_corpus

These libraries will help with text processing and creating conversational bots.


Step 2: Set Up Your Project

Create a new Python file for your bot, for example, ai_bot.py. Import the necessary libraries at the top of the file:

from chatterbot import ChatBot
from chatterbot.trainers import ChatterBotCorpusTrainer


Step 3: Initialize the ChatBot

Set up your chatbot instance and give it a name.

bot = ChatBot('AI_Bot')


Step 4: Train Your Bot

Train your bot using pre-defined datasets available in the chatterbot_corpus library.

trainer = ChatterBotCorpusTrainer(bot)
trainer.train('chatterbot.corpus.english')

This trains the bot to understand basic English conversations.


Step 5: Create a User Interaction Loop

Now, let’s create an interactive chat loop to allow users to converse with the bot.

print("Hello! I am your AI bot. Type 'exit' to end the conversation.")
while True:
    user_input = input("You: ")
    if user_input.lower() == 'exit':
        print("AI Bot: Goodbye!")
        break
    response = bot.get_response(user_input)
    print("AI Bot:", response)


Testing Your AI Bot

  1. Run the script:
    python ai_bot.py
    
  2. Start typing messages to interact with the bot. For example:
    • You: Hello
    • AI Bot: Hello! How can I assist you today?

The bot will respond based on its training dataset.


Advanced Tips

  1. Custom Training Data:
    Enhance your bot’s intelligence by training it on custom datasets.

    trainer.train([
        "Hi there!",
        "Hello! How can I help?",
        "What is AI?",
        "AI stands for Artificial Intelligence."
    ])
    
    
  2. Natural Language Processing:
    Use NLTK or spaCy for advanced NLP tasks like sentiment analysis or intent recognition.

  3. Deploy Your Bot:
    Integrate your bot with platforms like WhatsApp, Telegram, or a website using APIs like Flask or Django.


Conclusion

Building your first AI bot in Python is a rewarding experience. With tools like ChatterBot and libraries for NLP, creating intelligent conversational agents is easier than ever. Follow this guide, experiment with your bot, and expand its capabilities to suit your needs.

Start coding today and explore the limitless possibilities of AI!

How to dockerize your python application in docker

Dockerize your python application:






Docker is a technology which lets you build, deploy and run your applications. Docker enables you separate your infrastructure from your application. With Docker all you need to do is just write your code,.
dockerize it and distribute it in form of image. That way any one can use your application who is running the Docker.

What do you mean by Dockerize application?

Dockerize mean you write your code on your system then you prepare the image and distribute it over the internet or on DockerHub. You don't have to worry about the underlying infrastructure and dependencies.

Let's write a python program which will count the occurrence of words from a given string.


#Input : string = "Docker is a technology which
# lets you build, deploy and run your applications.";
#Count occurence of words from a given string example


def findFreq(s):
dictt = {}
strng = s.split(" ")
strr1 = set(strng)
for word in strr1:
dictt[word] = s.count(word)
return dictt
if __name__ == "__main__":
x = input("Enter your string:")
#raw_input in python 2.x and input() in python 3.x
print(findFreq(x))

#Output: {'a': 4, 'and': 1, 'run': 1, '': 80,
# 'deploy': 1, 'technology': 1, 'is': 1,
# 'you': 2, 'lets': 1, 'applications.': 1,
# 'which': 1, 'build,': 1, 'Docker': 1, 'your': 1}

Save this file with findfrequency.py in same directory. I am saving it in current directory for my convenience but you can save it anywhere and pass the absolute path.


Now lets create a Dockerfile.


FROM python:3

We need to use python in docker so we are using FROM keyword so this will create layer from python image. Means your image is based on python image. 

Now we need to run our python file so we need to add this file to Dockerfile.

ADD findfrequency.py /

Use CMD to execute commands when image loads

CMD ["python", "./findfrequency.py"]

Combine all above lines and create a Dockerfile.

FROM python:3
ADD findfrequency.py /
CMD ["python", "./findfrequency.py"]

So we have created a Dockerfile now. I saved it with the name "Dockerfile" in current directory. When you run docker build .     command then docker looks for Dockerfile if ithis file doesn't exist or file name is wrong or extension is wrong you'll get file not exists error.

Now we are ready to build image from the dockerfile. 

Open the terminal and run the below command and make sure you are in the same directory where you saved your Dockerfile as well as python file.

docker build -t myapp .


-t : This is tagging a name to your image. In this case I gave my image a name "myapp"
.(dot) : Is current directory

Ok so you have successfully build your image. Now Let's check what's inside the image by inspecting it.

docker inspect myapp

[

    {

        "Id": "sha256:c4595feabbd0b9aba4ae67037ea3c43a8c0aaf2abe6f6fd28d25b22a7cf9",

        "RepoTags": [

            "myapp:latest"

        ],

        "RepoDigests": [],

        "Parent": "",

        "Comment": "buildkit.dockerfile.v0",

        "Created": "2021-10-01T08:42:53.450488763Z",

        "Container": "",

        "ContainerConfig": {

            "Hostname": "",

            "Domainname": "",

            "User": "",

            "AttachStdin": false,

            "AttachStdout": false,

            "AttachStderr": false,

            "Tty": false,

            "OpenStdin": false,

            "StdinOnce": false,

            "Env": null,

            "Cmd": null,

            "Image": "",

            "Volumes": null,

            "WorkingDir": "",

            "Entrypoint": null,

            "OnBuild": null,

            "Labels": null

        },

        "DockerVersion": "",

        "Author": "",

        "Config": {

            "Hostname": "",

            "Domainname": "",

            "User": "",

            "AttachStdin": false,

            "AttachStdout": false,

            "AttachStderr": false,

            "Tty": false,

            "OpenStdin": false,

            "StdinOnce": false,

            "Env": [

                

                "LANG=C.UTF-8",

                "PYTHON_VERSION=3.9.7",

                "PYTHON_PIP_VERSION=21.2.4",

                "PYTHON_SETUPTOOLS_VERSION=57.5.0",

                "PYTHON_GET_PIP_SHA256=fa6f3fb93cce234cd4e8dd2be9c247653b52855a48dd44e6b21ff28b"

            ],

            "Cmd": [

                "python",

                "./findfrequency.py"

            ],


You'll see output something like above. Our python function is there inside the output under CMD tag.

Let's run the image.

docker run -it myapp   

Enter your string: This is my test to test dockerfile. 

{'': 37, 'is': 2, 'dockerfile.': 1, 'to': 1, 'my': 1, 'test': 2, 'This': 1}


See the output above and pass the desired string to count the words.

So we have successfully dockerized our application. You can send this image to others so that they can use your program and they don't have to worry about installing any dependencies which can cause your program to crash. 

Terraform for Data Engineers: How to Automate Your Database Setup

  Stop Manual Setup: Deploy a PostgreSQL Database with Terraform If you are still manually creating databases in the AWS or Azure console, y...