
Apache Airflow Orchestration
Author, schedule, and troubleshoot Apache Airflow DAGs when you need repeatable Python workflows instead of one-off cron scripts.
Install
npx skills add https://github.com/aradotso/data-skills --skill apache-airflow-orchestrationWhat is this skill?
- Define maintainable DAGs in Python with install paths for pip constraints and Docker Compose dev stacks
- Covers scheduling, monitoring, Airflow connections, and XCom patterns for task data handoff
- Troubleshooting guidance for failed tasks and operational DAG debugging
- Version-pinned install snippet (Airflow 3.2.0) with Python constraint URLs
- Part of the ara.so Data Skills collection for workflow orchestration
Adoption & trust: 356 installs on skills.sh; 1 GitHub stars; 2/3 security scanners passed (skills.sh audits); trending (+100% hot-view momentum).
Recommended Skills
Agent Browservercel-labs/agent-browser
Lark Imlarksuite/cli
Lark Calendarlarksuite/cli
Lark Sheetslarksuite/cli
Lark Vclarksuite/cli
Lark Contactlarksuite/cli
Journey fit
Common Questions / FAQ
Is Apache Airflow Orchestration safe to install?
skills.sh reports 2 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Apache Airflow Orchestration
# Apache Airflow Orchestration > Skill by [ara.so](https://ara.so) — Data Skills collection. Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) in Python code, making them maintainable, versionable, testable, and collaborative. ## Installation ### Using pip ```bash # Install Airflow with constraints for your Python version AIRFLOW_VERSION=3.2.0 PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" ``` ### Using Docker (Recommended for Development) ```bash # Download docker-compose.yaml curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml' # Create required directories mkdir -p ./dags ./logs ./plugins ./config # Set the Airflow user echo -e "AIRFLOW_UID=$(id -u)" > .env # Initialize the database docker compose up airflow-init # Start Airflow docker compose up ``` Access the web UI at `http://localhost:8080` (default credentials: `airflow`/`airflow`). ### Standalone Quick Start ```bash # Initialize database and create admin user airflow db init # Create admin user airflow users create \ --username admin \ --firstname Admin \ --lastname User \ --role Admin \ --email admin@example.com # Start the web server (default port 8080) airflow webserver --port 8080 # Start the scheduler (in another terminal) airflow scheduler ``` ## Core Concepts ### DAG (Directed Acyclic Graph) A DAG defines a workflow with tasks and their dependencies. Tasks must not create cycles. ### Basic DAG Structure ```python from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator # Default arguments applied to all tasks default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email': ['alerts@example.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), } # Define the DAG dag = DAG( 'example_data_pipeline', default_args=default_args, description='A simple data pipeline', schedule='0 0 * * *', # Run daily at midnight (cron expression) start_date=datetime(2024, 1, 1), catchup=False, # Don't run for past dates tags=['example', 'data-engineering'], ) def extract_data(**context): """Extract data from source""" print("Extracting data...") # Your extraction logic here return {'records': 1000} def transform_data(**context): """Transform extracted data""" # Access data from previous task via XCom ti = context['ti'] extracted = ti.xcom_pull(task_ids='extract') print(f"Transforming {extracted['records']} records...") return {'transformed_records': extracted['records']} def load_data(**context): """Load data to destination""" ti = context['ti'] transformed = ti.xcom_pull(task_ids='transform') print(f"Loading {transformed['transformed_records']} records...") # Define tasks extract = PythonOperator( task_id='extract', python_callable=extract_data, dag=dag, ) transform = PythonOperator( task_id='transform', python_callable=transform_data, dag=dag, ) load = PythonOperator( task_id='load', python_callable=lo