Overview
Airflow is built around operators, each having a different function and requiring a different approach to lineage. The OpenLineage Airflow integration detects which Airflow operators your DAG is using and extracts lineage data from them using extractors.
The community has already authored a number of extractors to support Airflow’s Great Expectations, BigQuery, Python, Postgres, SQL and Bash operators (and more – you can find all the extractors here.) Nevertheless, in the course of pursuing lineage, you may find yourself needing to write custom extractors.
Some teams use custom extractors to automate repeatable work – using the same code from PythonOperator
across a project, for example. Another common use case is that a team needs to use an operator for which a pre-built extractor does not yet exist. Airflow has literally hundreds of operators.
Built-in support for custom extractors makes OpenLineage a flexible, highly adaptable solution for pipelines that use Airflow for orchestration.
How it works
As we explain in the OpenLineage docs, custom extractors must be derived from the BaseExtractor
class (import it from openlineage.airflow.extractors.base
).
Extractors have methods they can implement: extract
, extract_on_complete
and get_operator_classnames
. Either extract
or extract_on_complete
is required. The get_operator_classnames
method, also required, is for providing a list of operators from which your extractor can get lineage.
For example:
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['PostgresOperator']
If the name of the operator matches one of the names on the list, the extractor will be instantiated – using the operator passed to the extractor as a parameter and stored in the self.operator
property – and both the extract
and extract_on_complete
methods will be called. They both return information used by the OpenLineage integration to emit OpenLineage events. The difference is that extract
is called before the operator's execute
method to generate a START
event, while extract_on_complete
is called afterward to generate a COMPLETE
event. The latter has access to any additional information that the operator leaves behind following execution. A good example of this is the SnowflakeOperator
, which sets query_id
s after execution.
Both methods return a TaskMetadata
structure:
@attr.s
class TaskMetadata:
name: str = attr.ib() # deprecated
inputs: List[Dataset] = attr.ib(factory=list)
outputs: List[Dataset] = attr.ib(factory=list)
run_facets: Dict[str, BaseFacet] = attr.ib(factory=dict)
job_facets: Dict[str, BaseFacet] = attr.ib(factory=dict)
The inputs and outputs are lists of plain OpenLineage datasets.
The run_facets
and job_facets
are dictionaries of optional JobFacets
and RunFacets
that accompany a job. For example, you might want to attach a SqlJobFacet
if your operator is executing SQL.
Note: in order for a custom extractor to work, it must be registered first, so the OpenLineage integration can import it. You can read about how to use environment variables to do this here.
Example: the RedshiftDataExtractor
In the RedshiftDataExtractor
, the extract_on_complete
method parses SQL, obtains task stats
using the get_facets
method of the RedshiftDataDatasetsProvider
class, and returns a TaskMetadata
instance. We can see usage of a SQL statement, and the connection is provided by an actual operator.
def extract_on_complete(self, task_instance) -> Optional[TaskMetadata]:
log.debug(f"extract_on_complete({task_instance})")
job_facets = {"sql": SqlJobFacet(self.operator.sql)}
log.debug(f"Sending SQL to parser: {self.operator.sql}")
sql_meta: Optional[SqlMeta] = parse(self.operator.sql, self.default_schema)
log.debug(f"Got meta {sql_meta}")
try:
redshift_job_id = self._get_xcom_redshift_job_id(task_instance)
if redshift_job_id is None:
raise Exception(
"Xcom could not resolve Redshift job id. Job may have failed."
)
except Exception as e:
log.error(f"Cannot retrieve job details from {e}", exc_info=True)
return TaskMetadata(
name=get_job_name(task=self.operator),
run_facets={},
job_facets=job_facets,
)
client = self.operator.hook.conn
redshift_details = [
"database",
"cluster_identifier",
"db_user",
"secret_arn",
"region",
]
connection_details = {
detail: getattr(self.operator, detail) for detail in redshift_details
}
stats = RedshiftDataDatasetsProvider(
client=client, connection_details=connection_details
).get_facets(
job_id=redshift_job_id,
inputs=sql_meta.in_tables if sql_meta else [],
outputs=sql_meta.out_tables if sql_meta else [],
)
return TaskMetadata(
name=get_job_name(task=self.operator),
inputs=[ds.to_openlineage_dataset() for ds in stats.inputs],
outputs=[ds.to_openlineage_dataset() for ds in stats.output],
run_facets=stats.run_facets,
job_facets={"sql": SqlJobFacet(self.operator.sql)},
)
Common issues
There are two common issues associated with custom extractors.
First, when the wrong path is provided to OPENLINEAGE_EXTRACTORS
, the extractor isn’t imported and OpenLineage events aren’t emitted. The path needs to be exactly the same as the one you are using in your code. Also, make sure that the extractor code is available to import from Airflow’s Python interpreter.
Second, imports from Airflow can be unnoticeably cyclical. This is due to the fact that OpenLineage code gets instantiated when the Airflow worker itself starts, in contrast to DAG code. OpenLineage extraction can fail as a result. To avoid this issue, make sure that all imports from Airflow are local – in the extract
or extract_on_complete
methods. If you need imports for type checking, guard them behind typing.TYPE_CHECKING
.
How to get started
Check out the existing extractors here.
Read the docs about the Airflow integration, including tips on registering and debugging your custom extractor, here.
How to contribute
We welcome your contributions! One of our existing integrations might be a good place to start. OpenLineage’s growing list of partners includes Airflow, dbt, Dagster and Flink.
Sounds fun? Check out our new contributor guide to get started.