Showing posts with label Apache airflow. Show all posts
Showing posts with label Apache airflow. Show all posts

Why is my Airflow Task stuck in "Queued" state? (5 Quick Fixes)


 

You’ve triggered your DAG, the UI shows the task as grey (Queued), but nothing happens for minutes—or hours. This is a classic Airflow bottleneck. Here is how to diagnose and fix it.

1. Check the "Concurrency" Limits

Airflow has several "safety brakes" to prevent your server from crashing. If you hit these limits, tasks will stay queued until others finish.

  • parallelism: The max number of task instances that can run across your entire Airflow environment.

  • dag_concurrency: The max number of tasks that can run for a single DAG.

  • max_active_runs_per_dag: If you have too many "Backfills" running, new tasks won't start.

The Fix: Check your airflow.cfg or your DAG definition. Increase max_active_tasks if your hardware can handle it.

2. Is the Scheduler actually alive?

Sometimes the Airflow UI looks fine, but the Scheduler process has died or hung.

  • Check the UI: Look at the top of the Airflow page. If there is a red banner saying "The scheduler does not appear to be running," that’s your answer.

  • The Fix: Restart the scheduler service:

  • systemctl restart airflow-scheduler
    # OR if using Docker:
    docker restart airflow-scheduler

3. "No Slots Available" in Pools

Airflow uses Pools to manage resources (like limiting how many tasks can hit a specific database at once). If your task belongs to a pool with 5 slots and 5 tasks are already running, your 6th task will stay Queued forever.

The Fix: Go to Admin -> Pools in the UI. Check if the "Default Pool" or your custom pool is full. Increase the slots if necessary.

4. Celery Worker Issues (For Production Setups)

If you are using the CeleryExecutor, the task is queued in Redis or RabbitMQ, but the Worker might not be picking it up.

  • The Check: Run airflow celery inspect_short to see if workers are online.

  • The Fix: Ensure your workers are pointed to the same Metadata DB and Broker as your Scheduler.

5. Resource Starvation (OOM)

If your worker node is out of RAM or CPU, it might accept the task but fail to initialize it, leading to a loop where the task stays queued.

What is Apache Airflow

 Airflow?


Apache airflow is an open source platform for developing, scheduling and monitoring batch oriented workflows. Airflow's extensible python framework allows you to write code and create dag for workflows.

What is Dag?


A dag is called as directed acyclic graph means while creating or designing a dag, Cycle should not be formed. Here for example if I execute a task A and task A executes task B, task B should execute task C. Task C should not be calling task A otherwise there will be a cycle formed and In math you can relate it to transitive dependency. So there shouldn't be a transitive dependency.

Dag represents a workflow which is a collection of tasks. Below is how a dag looks like:




Airflow has a nice GUI by which you can manage creating variables and other admin stuff. You can also view your dag on GUI , trigger it or cancel it. Everything you can manage via GUI.

Use Case for Airflow


Suppose you have a unix job that  processes a text file at specific time and then it calls other job which does it's part. But sometimes you missed the file and your first job would still run and fail and your next dependent job will also run and process no data. So to overcome this scenario we can use Airflow and create a DAG where all tasks will be dependent on each other. So when we get a file then only our first job will be triggered and once first job will be succeeded our next job will run.

Let's write your first dag


from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:

# Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")

@task()
def airflow():
print("airflow")

# Set dependencies between tasks
hello >> airflow()

In above code you need to import the DAG from airflow and BashOperator.

When you have a requirement to execute linux commands and you want to create a workflow use BashOperator. Similarly when you have a requirement to write a python function and execute it then use pythonOperator. In a summary BashOperator is used to execute bash commands and pythonOperator is used to execute the python commands.

Now once you have imported everything required the next line would be to define your dag.
When Airflow scheduler runs, it looks for a line which has "with DAG" in it. When it finds it then it creates a dag based on the config you passed.

After that there is BashOperator used where we creating a task with the name "hello" and executing command using BashOperator. You need to give a unique task id to each task keep it remember. And on the last line you can plan your flow like which task has to run first and last. 

Once you save the file the Airflow scheduler will automatically pick the new file and create the DAG. If any error occurs it will be displayed on the top of the Airflow UI.




Conclusion


Airflow is an awesome framework to use. It can be very useful to orchestrate all legacy unix jobs and flows. We just to choose airflow for right use case and it can solve those problems easily.


How to Fix: "SSL Certificate Problem: Self-Signed Certificate" in Git & Docker

This is one of the most common "Security vs. Productivity" errors. You’re trying to pull a private image or clone a repo, and your...