
Airflow Hitl
Add deferrable human approval, forms, and branching gates to Airflow 3.1+ DAGs without blocking workers.
Install
npx skills add https://github.com/astronomer/agents --skill airflow-hitlWhat is this skill?
- Maps five HITL capabilities—ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger—to use ca
- Deferrable operators release worker slots while humans respond in Browse → Required Actions
- Requires Airflow 3.1+ with version check via af config version
- Step 2 registry/API discovery workflow—do not copy parameter lists from memory across provider releases
- Explicitly excludes AI/LLM decorators—points to airflow-ai for that path
Adoption & trust: 664 installs on skills.sh; 384 GitHub stars; 3/3 security scanners passed (skills.sh audits).
Recommended Skills
Agent Browservercel-labs/agent-browser
Lark Imlarksuite/cli
Lark Calendarlarksuite/cli
Lark Sheetslarksuite/cli
Lark Vclarksuite/cli
Lark Contactlarksuite/cli
Journey fit
Primary fit
Canonical shelf is build integrations because solo builders wire HITL operators into DAG code and provider APIs during pipeline development. Fits integrations subphase: connects Airflow task operators, REST/UI Required Actions, and provider discovery to orchestration code.
Common Questions / FAQ
Is Airflow Hitl safe to install?
skills.sh reports 3 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Airflow Hitl
# Airflow Human-in-the-Loop Operators Pause a DAG until a human responds via the Airflow UI or REST API. HITL operators are deferrable — they release their worker slot while waiting. > **Requires Airflow 3.1+** (`af config version`). > > **UI location**: Browse → Required Actions. Respond from the task instance page's Required Actions tab. > > **Cross-references**: `airflow-ai` for AI/LLM task decorators; `airflow` for registry and API discovery commands used below. --- ## Step 1 — Pick the capability you need | Capability | Class (verify in Step 2) | |---|---| | Approve or reject; downstream skips on reject | `ApprovalOperator` | | Present N options and return which were chosen | `HITLOperator` | | Branch to one or more downstream tasks based on a choice | `HITLBranchOperator` | | Collect a form (no approve/select step) | `HITLEntryOperator` | | Use the HITL trigger directly (advanced / custom operators) | `HITLTrigger` | This is the only place class names are hardcoded. The provider adds, renames, and removes params across releases — do not copy parameter lists from memory. Fetch the current signature before writing code. --- ## Step 2 — Discover the current signatures from the Airflow Registry Before writing HITL code, run these to see the live roster and constructor params (see the `airflow` skill for the full `af registry` reference): ```bash # Every HITL-related module in the standard provider af registry modules standard \ | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}' # Constructor signatures: name, type, default, required, description af registry parameters standard \ | jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}' # Pin to the exact installed provider version af config providers \ | jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version' # then: af registry parameters standard --version <VERSION> ``` If the registry shows a param that this skill does not mention, prefer the registry. If the registry shows a class that is not in Step 1, treat it as additive — the decision table above may be stale. --- ## Step 3 — Canonical example (approval gate) Starting point for any HITL task. Adapt by swapping the class name and params per Step 2. ```python from airflow.providers.standard.operators.hitl import ApprovalOperator from airflow.sdk import dag, task, chain, Param from pendulum import datetime @dag(start_date=datetime(2025, 1, 1), schedule="@daily") def approval_example(): @task def prepare(): return "Review quarterly report" approval = ApprovalOperator( task_id="approve_report", subject="Report Approval", body="{{ ti.xcom_pull(task_ids='prepare') }}", defaults="Approve", # Auto-selected on timeout params={"comments": Param("", type="string")}, ) @task def after_approval(result): print(f"Decision: {result['chosen_options']}") chain(prepare(), approval) after_approval(approval.output) approval_example() ``` For the other classes in Step 1, the shape is the same (`task_id`, `subject`, plus class-specific params). Verify each constructor through Step 2 — for example, `HITLBranchOperator` requires every option either to match a downstream task id directly or to be resolved via a mapping param surfaced in the registry. --- ## Step 4 — Behavior contracts (stable across versions) ### Timeout - With `defaults` set: task succeeds on timeout, default option(s) selected. - Without `defaults`: task fails on timeout.