
Creating Openlineage Extractors
Implement custom OpenLineage extractors or operator methods so Airflow jobs emit lineage for unsupported or third-party operators.
Overview
creating-openlineage-extractors is an agent skill for the Build phase that guides custom OpenLineage lineage capture for Airflow operators lacking built-in support.
Install
npx skills add https://github.com/astronomer/agents --skill creating-openlineage-extractorsWhat is this skill?
- Decision table: own operators → OpenLineage methods; third-party → custom extractor; column lineage → methods or extract
- Explicit priority: prefer OpenLineage methods over custom extractors when you control the operator code
- Covers scenarios beyond inlets/outlets when table-level defaults are insufficient
- Points to the official Airflow OpenLineage provider developer guide for supported operators and hooks
- Notes Astro built-in OpenLineage transport so lineage collection needs no extra wiring on that platform
Adoption & trust: 664 installs on skills.sh; 384 GitHub stars; 3/3 security scanners passed (skills.sh audits).
What problem does it solve?
Your Airflow DAG uses unsupported or third-party operators, so lineage never reaches OpenLineage and downstream catalog tools stay blind.
Who is it for?
Builders maintaining Airflow pipelines who need column-level or non-trivial lineage from operators they do not fully control.
Skip if: Simple table-level lineage that inlets/outlets already satisfy on supported operators—skip custom extractors per the skill’s own priority table.
When should I use this skill?
User needs lineage from unsupported or third-party Airflow operators, wants column-level lineage, or needs extraction logic beyond inlets/outlets.
What do I get? / Deliverables
You choose and implement the right extraction pattern—methods or custom extractor—with column-level coverage when required, aligned with Airflow provider docs.
- Chosen lineage approach (methods vs custom extractor) documented for the operator
- Implementation outline or code aligned with OpenLineage Airflow provider patterns
Recommended Skills
Journey fit
Placement is Build → integrations because you extend how orchestration hooks report lineage during pipeline development, not during initial idea research. Work connects external metadata standards (OpenLineage) to Airflow operators—classic integration engineering on data orchestration stacks.
How it compares
Orchestration integration skill for metadata lineage—not a generic Airflow DAG authoring or deployment checklist.
Common Questions / FAQ
Who is creating-openlineage-extractors for?
Data engineers and indie builders shipping Airflow workloads who must expose job lineage to OpenLineage-compatible tools.
When should I use creating-openlineage-extractors?
During Build integrations when you add unsupported operators, need column-level lineage, or outgrow inlets/outlets on third-party code.
Is creating-openlineage-extractors safe to install?
It is documentation-driven integration guidance; review the Security Audits panel on this Prism page and vet any custom extractor code before production DAGs.
SKILL.md
READMESKILL.md - Creating Openlineage Extractors
# Creating OpenLineage Extractors This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support. > **Reference:** See the [OpenLineage provider developer guide](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/developer.html) for the latest patterns and list of supported operators/hooks. ## When to Use Each Approach | Scenario | Approach | |----------|----------| | Operator you own/maintain | **OpenLineage Methods** (recommended, simplest) | | Third-party operator you can't modify | Custom Extractor | | Need column-level lineage | OpenLineage Methods or Custom Extractor | | Complex extraction logic | OpenLineage Methods or Custom Extractor | | Simple table-level lineage | Inlets/Outlets (simplest, but lowest priority) | > **Important:** Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug. ### On Astro Astro includes built-in OpenLineage integration — no additional transport configuration is needed. Lineage events are automatically collected and displayed in the Astro UI's **Lineage tab**. Custom extractors deployed to an Astro project are automatically picked up, so you only need to register them in `airflow.cfg` or via environment variable and deploy. --- ## Two Approaches ### 1. OpenLineage Methods (Recommended) Use when you can add methods directly to your custom operator. This is the **go-to solution** for operators you own. ### 2. Custom Extractors Use when you need lineage from third-party or provider operators that you **cannot modify**. --- ## Approach 1: OpenLineage Methods (Recommended) When you own the operator, add OpenLineage methods directly: ```python from airflow.models import BaseOperator class MyCustomOperator(BaseOperator): """Custom operator with built-in OpenLineage support.""" def __init__(self, source_table: str, target_table: str, **kwargs): super().__init__(**kwargs) self.source_table = source_table self.target_table = target_table self._rows_processed = 0 # Set during execution def execute(self, context): # Do the actual work self._rows_processed = self._process_data() return self._rows_processed def get_openlineage_facets_on_start(self): """Called when task starts. Return known inputs/outputs.""" # Import locally to avoid circular imports from openlineage.client.event_v2 import Dataset from airflow.providers.openlineage.extractors import OperatorLineage return OperatorLineage( inputs=[Dataset(namespace="postgres://db", name=self.source_table)], outputs=[Dataset(namespace="postgres://db", name=self.target_table)], ) def get_openlineage_facets_on_complete(self, task_instance): """Called after success. Add runtime metadata.""" from openlineage.client.event_v2 import Dataset from openlineage.client.facet_v2 import output_statistics_output_dataset from airflow.providers.openlineage.extractors import OperatorLineage return OperatorLineage( inputs=[Dataset(namespace="postgres://db", name=self.source_table)], outputs=[ Dataset( namespace="postgres://db", name=self.target_table, facets={ "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet( rowCount=self._rows_processed