
Annotating Task Lineage
Add table-level OpenLineage metadata to Airflow tasks when operators lack built-in lineage extractors.
Install
npx skills add https://github.com/astronomer/agents --skill annotating-task-lineageWhat is this skill?
- Documents when to use inlets/outlets vs native OpenLineage operator methods
- Supports simple table-level lineage without custom extractors
- Calls out column-level lineage limits and when to use OL methods instead
- Notes Astro Lineage tab visibility for cross-DAG lineage in Astro deployments
Adoption & trust: 683 installs on skills.sh; 384 GitHub stars; 3/3 security scanners passed (skills.sh audits).
Recommended Skills
Azure Kubernetesmicrosoft/azure-skills
Github Actions Docsxixu-me/skills
Deploy To Vercelvercel-labs/agent-skills
Vercel Cli With Tokensvercel-labs/agent-skills
Turborepovercel/turborepo
Docker Expertsickn33/antigravity-awesome-skills
Journey fit
Common Questions / FAQ
Is Annotating Task Lineage safe to install?
skills.sh reports 3 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Annotating Task Lineage
# Annotating Task Lineage with Inlets & Outlets This skill guides you through adding manual lineage annotations to Airflow tasks using `inlets` and `outlets`. > **Reference:** See the [OpenLineage provider developer guide](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/developer.html) for the latest supported operators and patterns. ### On Astro Lineage annotations defined with inlets and outlets are visualized in Astro's enhanced **Lineage tab**, which provides cross-DAG and cross-deployment lineage views. This means your annotations are immediately visible in the Astro UI, giving you a unified view of data flow across your entire Astro organization. ## When to Use This Approach | Scenario | Use Inlets/Outlets? | |----------|---------------------| | Operator has OpenLineage methods (`get_openlineage_facets_on_*`) | ❌ Modify the OL method directly | | Operator has no built-in OpenLineage extractor | ✅ Yes | | Simple table-level lineage is sufficient | ✅ Yes | | Quick lineage setup without custom code | ✅ Yes | | Need column-level lineage | ❌ Use OpenLineage methods or custom extractor | | Complex extraction logic needed | ❌ Use OpenLineage methods or custom extractor | > **Note:** Inlets/outlets are the lowest-priority fallback. If an OpenLineage extractor or method exists for the operator, it takes precedence. Use this approach for operators without extractors. --- ## Supported Types for Inlets/Outlets You can use **OpenLineage Dataset** objects or **Airflow Assets** for inlets and outlets: ### OpenLineage Datasets (Recommended) ```python from openlineage.client.event_v2 import Dataset # Database tables source_table = Dataset( namespace="postgres://mydb:5432", name="public.orders", ) target_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="staging.orders_clean", ) # Files input_file = Dataset( namespace="s3://my-bucket", name="raw/events/2024-01-01.json", ) ``` ### Airflow Assets (Airflow 3+) ```python from airflow.sdk import Asset # Using Airflow's native Asset type orders_asset = Asset(uri="s3://my-bucket/data/orders") ``` ### Airflow Datasets (Airflow 2.4+) ```python from airflow.datasets import Dataset # Using Airflow's Dataset type (Airflow 2.4-2.x) orders_dataset = Dataset(uri="s3://my-bucket/data/orders") ``` --- ## Basic Usage ### Setting Inlets and Outlets on Operators ```python from airflow import DAG from airflow.operators.bash import BashOperator from openlineage.client.event_v2 import Dataset import pendulum # Define your lineage datasets source_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="raw.orders", ) target_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="staging.orders_clean", ) output_file = Dataset( namespace="s3://my-bucket", name="exports/orders.parquet", ) with DAG( dag_id="etl_with_lineage", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), schedule="@daily", ) as dag: transform = BashOperator( task_id="transform_orders", bash_command="echo 'transforming...'", inlets=[source_table], # What this task reads outlets=[target_table], # What this task writes ) export = BashOperator( task_id="export_to_s3", bash_command="echo 'exporting...'", inlets=[target_table], # Reads from previous output outlets=[output_file], # Writes to S3 ) transform >> export ``` ### Multiple Inputs and Outputs Tasks often read from multiple sources and write to multiple destinations: ```python from openlinea