Overview & background
Long one of our most requested new features, column-level lineage was added to the Spark integration with the release of OpenLineage 0.9.0. Project committer Paweł Leszczyński (@pawel-big-lebowski) authored the relevant pull requests (#645, #698, #738 and #772). In its current form, column-level lineage in OpenLineage is limited to the Spark integration and not yet visible in the Marquez UI. But this is only the first step in a broader, ongoing project to implement the feature across the project, and we’d love your help.
Column-level lineage is a worthy pursuit. It dramatically extends the reach of OpenLineage’s metadata capture, providing finely grained information about datasets' dependencies. As Paweł and project lead Julien Le Dem (@julienledem) wrote in the initial proposal, “Not only can we know that a dependency exists, but we are also able to understand which input columns are used to produce output columns. This allows [for] answering questions like ‘Which root input columns are used to construct column x?’”
Another reason to pursue column-level lineage: the demands of regulatory compliance. Bodies such as the GDPR, HIPAA, CCPA, BCBS and PCI have instituted requirements for data accuracy and integrity that compel companies and organizations to obtain deeper insight into their datasets and pipelines.
Why start with the Spark integration?
As Julien and Paweł's proposal suggests, the Spark integration was a logical starting point for adding column-level lineage. This is so because the integration relies on implementing visitors that traverse a LogicalPlan
and extract meaningful information when encountered. These data include outputs and inputs with their schemas (which we were already identifying, in fact). The LogicalPlan
also exposes the expressions that derive the output columns from the input columns. They can be inspected to derive column-level lineage. Traversing the LogicalPlan
allows for the capturing of all the dependencies required to build column-level lineage.
A new facet in the spec
In the process of implementing column-level lineage, Paweł and Julien contributed a new facet schema, ColumnLineageDatasetFacet
, to the OpenLineage spec. This facet uses fields to relay data points about dependencies. These are properties of items in the InputField
property of the facet (namespace
, name
and field
), as well as two human-readable string fields (transformationDescription
, transformationType
) for conveying information about dataset transformations. The last field, transformationType
, may be especially useful for those whose companies or organizations need to track the usage of sensitive personal information.
An example of a columnLineage
facet in the outputs array of a lineage event:
{
"namespace": "{namespace of the outputdataset}",
"name": "{name of the output dataset}",
"facets": {
"schema": {
"fields": [
{ "name": "{first column of the output dataset}", "type": "{its type}"},
{ "name": "{second column of the output dataset}", "type": "{its type}"},
...
]
},
"columnLineage": {
"{first column of the output dataset}": {
"inputFields": [
{ "namespace": "{input dataset namespace}", name: "{input dataset name}", "field": "{input dataset column name}"},
... other inputs
],
"transformationDescription": "identical",
"transformationType": "IDENTITY"
},
"{second column of the output dataset}": ...,
...
}
}
}
How it works
As we’ve seen, column-level lineage is being collected via the new columnLineage
dataset facet. For each output, this facet contains a list of the output's fields along with the input fields used to create it. The input fields are identified by a namespace
, name
and field
. But how is OpenLineage obtaining the data about dependencies that the facet relays?
In PR #698, Paweł describes the mechanism this way:
- The core mechanism first gets an output schema and logical plan as inputs.
- Then, the
OutputFieldsCollector
class traverses the plan to gather the outputs. Outputs can be extracted from Aggregate or Project, and each output field has anExprId
(expression ID) that is attached from the plan. - Next, the
InputFieldsCollector
class is used to collect inputs that can be extracted fromDataSourceV2Relation
,DataSourceV2ScanRelation
,HiveTableRelation
orLogicalRelation
. Each input field takes itsExprId
from the plan, and each input is identified by aDatasetIdentifier
, which means it contains the name and namespace of a dataset and an input field. - Finally, the
FieldDependenciesCollector
traverses the plan to identify dependencies between differentExprIds
. Dependencies map parent expressions to children expressions. This is used to identify the inputs used to evaluate certain outputs.
What’s next?
Work on extending column-level lineage in the project is ongoing. For example, project committer Will Johnson (@wjohnson) has opened a PR (#963) to add support for common dataframe operations not covered due to the initial focus on Spark. As Will writes in the PR,
Currently, the Column Lineage Input Field Collectors work mainly for Spark SQL operations and Data Source V2. This leaves out normal dataframe operations like inserting into HDFS without the use of a Hive table. Column Lineage should support this scenario as many users will want to see column lineage for operations outside of SQL and Hive Metastore backed tables.
Also, Paweł has written enhancements that will enable column-level lineage in the case of altered table and column names and allow one to extend column-level lineage without contributing to OpenLineage (to avoid exposing proprietary code, for example).
Meanwhile, over in Marquez, Julien has contributed a proposal to add a column-level endpoint to the project that would leverage OpenLineage’s ColumnLineageDatasetFacet
. This approach would add column lineage to an existing endpoint by embedding the columnLineage
facet in the data section of the DATASET
nodes.
How can I contribute?
We welcome contributions to this ongoing effort at implementing column-level lineage in OpenLineage! If you’re interested in contributing, one of our existing integrations might be a good place to start. OpenLineage’s growing list of integrations includes Airflow, dbt, Dagster and Flink.
Sounds fun? Check out our new contributor guide to get started.