Overview

Airflow Operators and OpenLineage Extractors have a specific, if quirky, way of working together. Recently, the way they work together has seen a bit of an overhaul, and the new SQL Check Extractors added a new and unique way that the extractors work and interact with operators. In this blog, we'll demystify these relationships.

Note: This blog post describes the relationships of the operators and extractors only for Airflow>=2.3 and OpenLineage>=0.12.0.

Integration

The Operator and the Extractor

Some quick definitions are in order before we continue.

The Airflow Operator defines a task, which is the unit of work in Airflow. All operators inherit from the BaseOperator, and in addition to taking the arguments of the BaseOperator, they can take arguments specific to the kind of task they are going to perform, such as a specific conn_id to connect to a datasource or a dictionary of checks to perform on that datasource.

The OpenLineage Extractor is somewhat analogous to the Airflow Operator: it is a unit of work in OpenLineage, which takes the relevant input and output data from an operator, creates OpenLineage data facets, and sends those facets to be displayed in Marquez or Datakin. Each extractor maps to a specific set of operators via the get_operator_classnames() class method. The extractors all inherit from a BaseExtractor, which defines a few abstract methods, importantly execute() and execute_on_complete().

Briefly, the two other major OpenLineage constructs in this story are the ExtractorManager, which is responsible for identifying the correct extractor to use, and the Listener, which is the connecting piece between OpenLineage and Airflow.

Interplay

Next, we'll walk through what happens when an Airflow instance with OpenLineage support runs a DAG, and how that operator data makes it to the Marquez or Datakin UI.

First, a DAG is born. When the DAG is run, the scheduler runs the operators in order by calling their execute() method. This is the first time the OpenLineage Listener responds. Triggered by the execute() event, it calls the Manager, which identifies the correct extractor based on the task_id. Then the Extractor's execute() method is run, potentially returning lineage data in the form of a metadata object. When the operator is done--either succeeded or failed--, the Listener calls the Manager again, and this time the Manager triggers the Extractor's execute_on_complete() method, which may also return metadata based on the result of the task. The metadata object is then sent to Marquez or Datakin, where the data is displayed.

SQL Check Operators/Extractors

The SQLCheckOperators and SQLCheckExtractors work slightly differently than the interplay outlined above. The biggest difference is that the SQLCheckExtractors all inherit from a BaseSqlCheckExtractor, which in turn dynamically inherits from the appropriate extractor at run time. The appropriate extractor is always some existing SQL database extractor. The BaseSqlCheckExtractor itself only implements the extract_on_complete() method by determining whether the super class’s extract() or extract_on_complete() method should be run to gather metadata. The _get_input_facets() methods are all implemented by the particular check extractors, and are called in the super class’s extract() or extract_on_complete() method.

The dynamic inheritance is done by defining the SQLCheckExtractors inside the get_check_extractors() function that takes a class as a parameter and passes that class to the constructor of the BaseSqlCheckExtractor. When a SQLCheckOperator is run, and the Manager searches for the correct extractor to use, it calls instantiate_abstract_extractors() with the given task instance.

In this function, the task instance is used to find the correct extractor that will be the superclass of the BaseSqlCheckOperator. To do this, the function uses a set of hard-coded operator names whose extractors will dynamically inherit from the found superclass. Currently, this list is the set of SQLCheckOperators, which corresponds to a dictionary of extractor keys and conn_type values. The given task’s class name is checked against the set of operator names, and if it is in the set, a loop compares the existing extractor’s corresponding conn_type from the aforementioned dictionary to the given task instance’s conn_type retrieved from Airflow connections. If there’s a match, the get_check_extractors() method is called with the matched extractor, instantiating all the operators with the correct superclass.

The SQLCheckExtractors only rely on the extract_on_complete() method, as the values needed from the operators, i.e. the results of the query and the success or failures of the checks, are only available after the operator completes.