
Airflow Dag Patterns
Author production Airflow DAGs with TaskFlow API, XCom data passing, and daily ETL extract-transform-load patterns on S3.
Install
npx skills add https://github.com/wshobson/agents --skill airflow-dag-patternsWhat is this skill?
- TaskFlow API (@dag, @task) with typed returns and XCom-style dependency wiring
- Worked ETL pattern: extract from CSV on S3, transform with pandas, load to parquet
- Notification tail task after load for row-count observability
- Tags and catchup=False defaults for operational daily schedules
Adoption & trust: 7.1k installs on skills.sh; 36.5k GitHub stars; 3/3 security scanners passed (skills.sh audits).
Recommended Skills
Paper Context Resolverlllllllama/ai-paper-reproduction-skill
Repo Intake And Planlllllllama/ai-paper-reproduction-skill
Env And Assets Bootstraplllllllama/ai-paper-reproduction-skill
Minimal Run And Auditlllllllama/ai-paper-reproduction-skill
Analyze Projectlllllllama/rigorpilot-skills
Ai Research Reproductionlllllllama/rigorpilot-skills
Journey fit
Primary fit
Orchestrating batch ETL and scheduled pipelines is core backend and data-platform work during product build. DAG design, decorators, and task dependencies belong under backend/data integrations rather than frontend or agent tooling.
Common Questions / FAQ
Is Airflow Dag Patterns safe to install?
skills.sh reports 3 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Airflow Dag Patterns
# airflow-dag-patterns — detailed patterns and worked examples ## Patterns ### Pattern 1: TaskFlow API (Airflow 2.0+) ```python # dags/taskflow_example.py from datetime import datetime from airflow.decorators import dag, task from airflow.models import Variable @dag( dag_id='taskflow_etl', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'taskflow'], ) def taskflow_etl(): """ETL pipeline using TaskFlow API""" @task() def extract(source: str) -> dict: """Extract data from source""" import pandas as pd df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv') return {'data': df.to_dict(), 'rows': len(df)} @task() def transform(extracted: dict) -> dict: """Transform extracted data""" import pandas as pd df = pd.DataFrame(extracted['data']) df['processed_at'] = datetime.now() df = df.dropna() return {'data': df.to_dict(), 'rows': len(df)} @task() def load(transformed: dict, target: str): """Load data to target""" import pandas as pd df = pd.DataFrame(transformed['data']) df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet') return transformed['rows'] @task() def notify(rows_loaded: int): """Send notification""" print(f'Loaded {rows_loaded} rows') # Define dependencies with XCom passing extracted = extract(source='raw_data') transformed = transform(extracted) loaded = load(transformed, target='processed_data') notify(loaded) # Instantiate the DAG taskflow_etl() ``` ### Pattern 2: Dynamic DAG Generation ```python # dags/dynamic_dag_factory.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable import json # Configuration for multiple similar pipelines PIPELINE_CONFIGS = [ {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'}, {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'}, {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'}, ] def create_dag(config: dict) -> DAG: """Factory function to create DAGs from config""" dag_id = f"etl_{config['name']}" default_args = { 'owner': 'data-team', 'retries': 3, 'retry_delay': timedelta(minutes=5), } dag = DAG( dag_id=dag_id, default_args=default_args, schedule=config['schedule'], start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'dynamic', config['name']], ) with dag: def extract_fn(source, **context): print(f"Extracting from {source} for {context['ds']}") def transform_fn(**context): print(f"Transforming data for {context['ds']}") def load_fn(table_name, **context): print(f"Loading to {table_name} for {context['ds']}") extract = PythonOperator( task_id='extract', python_callable=extract_fn, op_kwargs={'source': config['source']}, ) transform = PythonOperator( task_id='transform', python_callable=transform_fn, ) load = PythonOperator( task_id='load', python_callable=load_fn, op_kwargs={'table_name': config['name']}, ) extract >> transform >> load return dag # Generate DAGs for config in PIPELINE_CONFIGS: globals()[f"dag_{config['name']}"] = create_dag(config) ``` ### Pattern 3: Branching and Conditional Logic ```python # dags/branching_example.py from airflow.decorators import dag, task from airflow.operators.python import BranchPythonOperator from airflow.operators.empty import EmptyOperator from airflow.utils.trigger_rule import TriggerRule @dag( dag_id='branching_pipeline', schedule='@daily', start_date=datetim