Overview

Airflow is built around operators, each having a different function and requiring a different approach to lineage. The OpenLineage Airflow integration detects which Airflow operators your DAG is using and extracts lineage data from them using extractors.

The community has already authored a number of extractors to support Airflow’s Great Expectations, BigQuery, Python, Postgres, SQL and Bash operators (and more – you can find all the extractors here.) Nevertheless, in the course of pursuing lineage, you may find yourself needing to write custom extractors.

Some teams use custom extractors to automate repeatable work – using the same code from PythonOperator across a project, for example. Another common use case is that a team needs to use an operator for which a pre-built extractor does not yet exist. Airflow has literally hundreds of operators.

Built-in support for custom extractors makes OpenLineage a flexible, highly adaptable solution for pipelines that use Airflow for orchestration.

How it works

As we explain in the OpenLineage docs, custom extractors must be derived from the BaseExtractor class (import it from openlineage.airflow.extractors.base).

Extractors have methods they can implement: extract, extract_on_complete and get_operator_classnames. Either extract or extract_on_complete is required. The get_operator_classnames method, also required, is for providing a list of operators from which your extractor can get lineage.

For example:

@classmethod def get_operator_classnames(cls) -> List[str]: return ['PostgresOperator']

If the name of the operator matches one of the names on the list, the extractor will be instantiated – using the operator passed to the extractor as a parameter and stored in the self.operator property – and both the extract and extract_on_complete methods will be called. They both return information used by the OpenLineage integration to emit OpenLineage events. The difference is that extract is called before the operator's execute method to generate a START event, while extract_on_complete is called afterward to generate a COMPLETE event. The latter has access to any additional information that the operator leaves behind following execution. A good example of this is the SnowflakeOperator, which sets query_ids after execution.

Both methods return a TaskMetadata structure:

@attr.s class TaskMetadata: name: str = attr.ib() # deprecated inputs: List[Dataset] = attr.ib(factory=list) outputs: List[Dataset] = attr.ib(factory=list) run_facets: Dict[str, BaseFacet] = attr.ib(factory=dict) job_facets: Dict[str, BaseFacet] = attr.ib(factory=dict)

The inputs and outputs are lists of plain OpenLineage datasets.

The run_facets and job_facets are dictionaries of optional JobFacets and RunFacets that accompany a job. For example, you might want to attach a SqlJobFacet if your operator is executing SQL.

Note: in order for a custom extractor to work, it must be registered first, so the OpenLineage integration can import it. You can read about how to use environment variables to do this here.

Example: the RedshiftDataExtractor

In the RedshiftDataExtractor, the extract_on_complete method parses SQL, obtains task stats using the get_facets method of the RedshiftDataDatasetsProvider class, and returns a TaskMetadata instance. We can see usage of a SQL statement, and the connection is provided by an actual operator.

def extract_on_complete(self, task_instance) -> Optional[TaskMetadata]: log.debug(f"extract_on_complete({task_instance})") job_facets = {"sql": SqlJobFacet(self.operator.sql)} log.debug(f"Sending SQL to parser: {self.operator.sql}") sql_meta: Optional[SqlMeta] = parse(self.operator.sql, self.default_schema) log.debug(f"Got meta {sql_meta}") try: redshift_job_id = self._get_xcom_redshift_job_id(task_instance) if redshift_job_id is None: raise Exception( "Xcom could not resolve Redshift job id. Job may have failed." ) except Exception as e: log.error(f"Cannot retrieve job details from {e}", exc_info=True) return TaskMetadata( name=get_job_name(task=self.operator), run_facets={}, job_facets=job_facets, ) client = self.operator.hook.conn redshift_details = [ "database", "cluster_identifier", "db_user", "secret_arn", "region", ] connection_details = { detail: getattr(self.operator, detail) for detail in redshift_details } stats = RedshiftDataDatasetsProvider( client=client, connection_details=connection_details ).get_facets( job_id=redshift_job_id, inputs=sql_meta.in_tables if sql_meta else [], outputs=sql_meta.out_tables if sql_meta else [], ) return TaskMetadata( name=get_job_name(task=self.operator), inputs=[ds.to_openlineage_dataset() for ds in stats.inputs], outputs=[ds.to_openlineage_dataset() for ds in stats.output], run_facets=stats.run_facets, job_facets={"sql": SqlJobFacet(self.operator.sql)}, )

Common issues

There are two common issues associated with custom extractors.

First, when the wrong path is provided to OPENLINEAGE_EXTRACTORS, the extractor isn’t imported and OpenLineage events aren’t emitted. The path needs to be exactly the same as the one you are using in your code. Also, make sure that the extractor code is available to import from Airflow’s Python interpreter.

Second, imports from Airflow can be unnoticeably cyclical. This is due to the fact that OpenLineage code gets instantiated when the Airflow worker itself starts, in contrast to DAG code. OpenLineage extraction can fail as a result. To avoid this issue, make sure that all imports from Airflow are local – in the extract or extract_on_complete methods. If you need imports for type checking, guard them behind typing.TYPE_CHECKING.

How to get started

Check out the existing extractors here.

Read the docs about the Airflow integration, including tips on registering and debugging your custom extractor, here.

How to contribute

We welcome your contributions! One of our existing integrations might be a good place to start. OpenLineage’s growing list of partners includes Airflow, dbt, Dagster and Flink.

Sounds fun? Check out our new contributor guide to get started.