name: airflow-3x-migration
description: Comprehensive guide and patterns for migrating Apache Airflow 2.x workflows to Airflow 3.x, covering import changes, deprecated features, and new paradigms like Asset scheduling and TaskFlow API.
version: 1.0.0
Airflow 3.x Skills
Import Path Changes
Operators
# Airflow 2.x
from airflow.operators.python import PythonOperator
# Airflow 3.x
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
Sensors
# Airflow 3.x
from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.providers.standard.sensors.time import TimeSensor
Removed Features
| Removed | Replacement |
|---|
SubDagOperator | TaskGroup |
packaged_dag_processor | Use standard DAG loading |
airflow.contrib.* | Provider packages |
schedule_interval param | schedule param |
DAG Definition Changes
# Airflow 3.x preferred
from airflow import DAG
from datetime import datetime
with DAG(
dag_id="my_dag",
schedule="@daily", # Not schedule_interval
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["betting", "sports"],
) as dag:
...
TaskFlow API (Preferred)
from airflow.decorators import dag, task
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def betting_workflow():
@task
def download_games(sport: str) -> list:
# Returns are automatically passed via XCom
return fetch_games(sport)
@task
def update_elo(games: list) -> dict:
return calculate_elo(games)
# Chain tasks
games = download_games("nba")
ratings = update_elo(games)
betting_dag = betting_workflow()
Asset-Based Scheduling (Replaces Dataset)
from airflow.sdk import Asset
# Define assets
games_data = Asset("games_data")
elo_ratings = Asset("elo_ratings")
# Producer DAG
@dag(schedule="@daily")
def download_dag():
@task(outlets=[games_data])
def download():
...
# Consumer DAG - triggers when asset updates
@dag(schedule=[games_data])
def process_dag():
@task
def process():
...
Setup/Teardown Tasks
@task
def setup_db_connection():
return create_connection()
@task
def cleanup_connection(conn):
conn.close()
@task
def process_data(conn):
...
# Define setup/teardown relationship
with dag:
conn = setup_db_connection()
process_data(conn) >> cleanup_connection(conn)
# Or use context manager style
conn.as_setup() >> process_data(conn) >> conn.as_teardown()
DAG Versioning
from airflow import DAG
with DAG(
dag_id="betting_workflow",
version="2.0.0", # New in 3.x
schedule="@daily",
) as dag:
...
Backfill Changes
# Airflow 3.x - use REST API
curl -X POST "http://localhost:8080/api/v1/dags/my_dag/dagRuns" \
-H "Content-Type: application/json" \
-d '{"logical_date": "2024-01-15T00:00:00Z"}'
# Or use new backfill command
airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-15
New REST API Endpoints
import requests
# Get DAG runs
response = requests.get(
"http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
auth=("admin", "admin")
)
# Trigger DAG
response = requests.post(
"http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
json={"conf": {"sport": "nba"}},
auth=("admin", "admin")
)
Edge Labels
from airflow.utils.edgemodifier import Label
download >> Label("success") >> process
download >> Label("failure") >> alert
Migration Checklist
Files to Reference
dags/multi_sport_betting_workflow.py - Already uses 3.x imports
- Airflow 3.0 Migration Guide
Airflow 3.x CLI
- Has changed significantly since Airflow 2.
- Please look at latest docs before running CLI commands