Damien Hawes (damien.hawes@booking.com)
2023-12-11 11:30:58

@Damien Hawes has joined the channel

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2023-12-11 11:31:09

@Paweł Leszczyński has joined the channel

Mattia Bertorello (mattia.bertorello@booking.com)
2023-12-11 11:31:09

@Mattia Bertorello has joined the channel

Damien Hawes (damien.hawes@booking.com)
2023-12-11 11:32:59

@Paweł Leszczyński - meet @Mattia Bertorello - one of my team members. He will be helping us to support Scala 2.13 with the Spark integration. For clarity, I am going to share the messages we exchanged earlier today in order to get him up to speed.

Damien Hawes (damien.hawes@booking.com)
2023-12-11 11:36:15

> Damien Hawes: Yeah - the cross-building of the spark integration isn't fun. I'm doing a toy project at the moment, to understand how to do it with Gradle. So far I can get a regular project to publish to my local maven repository with Scala 2.12 and Scala 2.13 > Paweł Leszczyński yeah, i think we should have extra scalaVersion param as we have sparkVersion and publish two artifacts with scala suffix at the end > Damien Hawes: Yup. The standard Scala suffixes. Though, I'm trying to wrap my head around how we currently get the entire integration to "come together" so to speak. I do have a concern though: if we publish two artifacts, what happens when someone runs a Spark 3.1 pipeline for example which only supports Scala 2.12 but we have Scala 2.13 code in there as well because of the way we "combine" all the sub-projects into 1 artifact > Paweł Leszczyński: in circle ci there is a matrix of params and we should be able somehow to prevent publishing spark2 + scala2.13 thats' the version matrix file https://github.com/OpenLineage/OpenLineage/blob/main/.circleci/workflows/openlineage-spark.yml > Damien Hawes So far I have this in my settings.gradle.kts of my toy project: ```rootProject.name = "openlineage-spark"

val scalaBinaryVersion = settings.extra["scalaVersion"] .toString() .split(".") .take(2) .joinToString(".")

include("openlineage-spark-core")

if (scalaBinaryVersion == "2.12") { include("openlineage-spark-spark30") include("openlineage-spark-spark31") include("openlineage-spark-spark32") include("openlineage-spark-spark33") include("openlineage-spark-spark34") include("openlineage-spark-spark35") }

if (scalaBinaryVersion == "2.13") { include("openlineage-spark-spark32") include("openlineage-spark-spark32") include("openlineage-spark-spark33") include("openlineage-spark-spark34") include("openlineage-spark-spark35") } > Damien Hawes: Then in my gradle.properties I have this: # This is used as the default Scala version

Suggested versions of Scala to pick from are: 2.11.12, 2.12.18, 2.13.12

scalaVersion=2.12.18

This is used as the default Spark version

sparkVersion=3.2.1```

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-12 02:52:22

@Maciej Obuchowski has joined the channel

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2023-12-12 02:52:39

👋

Damien Hawes (damien.hawes@booking.com)
2023-12-12 08:47:23

Hello. I've been trying to balance of developer experience versus publishing experience versus user experience.

There are several approaches we could take.

Approach #1

Produce and publish an artefact per version of Spark x Scala. Given the table below, this would mean Spark 3.2.x, 3.3.x, 3.4.x, and 3.5.x would see two artefacts produced, something like io.openlineage:openlineage_spark_3.2_2.12:$VERSION & io.openlineage:openlineage_spark_3.2_2.13:$VERSION

| Spark Version | Scala 2.10 | Scala 2.11 | Scala 2.12 | Scala 2.13 |
+---------------+------------+------------+------------+------------+
| 2.2.x         |     ✓      |     ✓      |            |            |
+---------------+------------+------------+------------+------------+
| 2.3.x         |            |     ✓      |            |            |
+---------------+------------+------------+------------+------------+
| 2.4.x         |            |     ✓      |     ✓      |            |
+---------------+------------+------------+------------+------------+
| 3.0.x         |            |            |     ✓      |            |
+---------------+------------+------------+------------+------------+
| 3.1.x         |            |            |     ✓      |            |
+---------------+------------+------------+------------+------------+
| 3.2.x         |            |            |     ✓      |     ✓      |
+---------------+------------+------------+------------+------------+
| 3.3.x         |            |            |     ✓      |     ✓      |
+---------------+------------+------------+------------+------------+
| 3.4.x         |            |            |     ✓      |     ✓      |
+---------------+------------+------------+------------+------------+
| 3.5.x         |            |            |     ✓      |     ✓      |
+---------------+------------+------------+------------+------------

Benefits:

  1. Very clear boundaries of the Spark and Scala version the artefact supports.
  2. We probably will not run into Scala binary incompatibility issues.
  3. We don't include code from other versions of Spark into the published JAR. Drawbacks:

  4. The developer experience becomes a bit more difficult, as it becomes necessary to edit the root gradle.properties file to select the correct versions.

  5. We need to add integration tests.
  6. We need to conditionally compile things.
  7. There are more artefacts to publish.
  8. We have to educate users about this, that they need to carefully select which one to go for. Approach #2

Produce an artefact per Scala major version. This means, something like: Scala 2.12 would include support for Spark 2.4, Spark 3.0, Spark 3.1, Spark 3.2, Spark 3.3, Spark 3.4, and Spark 3.5.

Benefits:

  1. Two artefacts are published
  2. The developer experienced is kept roughly the same, activating Scala 2.13 would deactivate 3.1 and older from being considered in the Gradle build. Drawbacks:

  3. We may end up including Scala 2.12 code with Scala 2.13 artefacts and vice versa.

  4. We need to add integration tests.
  5. We need to conditionally compile things.
  6. More artefacts to publish.

    5. We have to educate users about this, that they need to carefully select which one to go for.

Now there is another elephant in the room, switching to Scala 2.13 is not an easy feat. According to @Mattia Bertorello it breaks some code, where exactly, I'm not so sure. Perhaps he can shed light on this when he has the time.

What do you folks think? @Paweł Leszczyński @Maciej Obuchowski

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-12 08:54:16

*Thread Reply:* I would definitely gravitate for option 2, given that we already have infrastructure build to publish one artifact for all Spark versions we support for a single Scala version.

Could you elaborate on this, as I'm not sure why would it happen > We may end up including Scala 2.12 code with Scala 2.13 artefacts and vice versa. Also: > More artefacts to publish. Seems like less, only openlineage-spark_2.12 and openlineage_spark_2.13 - and possibly later versions of Scala, rather than full Spark x Scala matrix 🙂

> According to @Mattia Bertorello it breaks some code, where exactly, I'm not so sure. Perhaps he can shed light on this when he has the time. Agreed, this is a challenge. My first idea would be to have just two different classes conditionally compiled, similar to what we're doing now with particular Spark versions - although the problem might be that we'll have way too many internal subprojects... not an easy challenge and probably requires research - if it's just few classes then probably okay, if we need to have two versions of pretty much every class it's a no go.

Damien Hawes (damien.hawes@booking.com)
2023-12-12 09:17:22

*Thread Reply:* Yeah, I know the Spline folks did some magic with runtime code generation. But I think the amount of code they generated was small, and used for json serialization purposes.

> Could you elaborate on this, as I'm not sure why would it happen >> We may end up including Scala 2.12 code with Scala 2.13 artefacts and vice versa. > When I first starting playing around with cross-building my toy project, I noticed that Gradle was building all the projects, i.e., there was a JAR being generated for each when I ran ./gradlew clean build even if I ran ./gradlew clean build -PscalaVersion=2.12.

However, I have made some changes to the settings.gradle.kts file (the one you see earlier in the channel), and it seems to be working as expected, i.e., running the same command as specified above, deactivates the unused projects. Though, IntelliJ hates this, and red squiggly lines are displayed on those projects.

Damien Hawes (damien.hawes@booking.com)
2023-12-12 09:24:01

*Thread Reply:* Oh right, now I remember the other concern I had.

core is compiled against a specific version of Spark, for the sake of development. The concern I have, is if that interface (or the objects it relies on) changes. So far, we haven't seen it yet - thankfully.

Mattia Bertorello (mattia.bertorello@booking.com)
2023-12-12 10:20:50

*Thread Reply:* Hi @Maciej Obuchowski I did some work to make it compile with the Scala 2.13 version and see how many changes are necessary. Until now, this version still has some issues and is not compiling.

Then, I considered using this library scala-collection-compat to allow the cross-compilation, but I still need to test it. Most of the changes done in scala 2.13 that impact OpenLineage are: • JavaConverters were removed • Use the immutable data type from the collection library • Some types of issues with the Seq Builder My next steps are to try to compile these changes in 2.12 and fix the issues that are still there in the 2.13

Here is my code, and it is still in the WIP https://github.com/OpenLineage/OpenLineage/compare/main...mattiabertorello:OpenLineage:feature/support_scala_2_13?expand=1

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-12 10:25:14

*Thread Reply:* > core is compiled against a specific version of Spark, for the sake of development. The concern I have, is if that interface (or the objects it relies on) changes. So far, we haven't seen it yet - thankfully. With every new Spark version we have to do some things anyway to make everything work, as we basically see by having all those directories with version-specific code 🙂 We'd see that in integration test code - and I think Spark now is vary to remove core stuff everyone is using.

@Mattia Bertorello nice work 🙂 Would it be able to put this behind some wrapper library, to be code-compatible with 2.12 code?

We might still need to release versions compiled for specific version, but sounds like those changes would not require source code changes if we can isolate those.

Mattia Bertorello (mattia.bertorello@booking.com)
2023-12-12 10:32:30

*Thread Reply:* > Would it be able to put this behind some wrapper library, to be code-compatible with 2.12 code? Maybe use the already exist class ScalaConversionUtils everywhere to make it easy to change. I will also check for the other changes if a wrapper class is necessary

🙌 Paweł Leszczyński, Maciej Obuchowski
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2023-12-12 11:03:11

*Thread Reply:* Yeah. would be perfect if we could have ScalaConversionUtils that calls either ScalaConversionUtils_2_12 or ScalaConversionUtils_2_13 methods and stands as the only place in the whole codebase to be aware of which scala version is used.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2023-12-12 11:05:31

*Thread Reply:* perhaps we could add empty Seq to ScalaConversionUtils as well

👍 Mattia Bertorello
Damien Hawes (damien.hawes@booking.com)
2023-12-12 10:46:28

@Maciej Obuchowski - how much hate would I get, if I switched from Groovy DSL to Kotlin DSL?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-12 11:08:52

*Thread Reply:* you would only get love 😄

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-12 11:09:45

*Thread Reply:* I definitely like Kotlin one more as well, but Groovy was from the beginning and never felt the urge to make the switch myself 🙂

Damien Hawes (damien.hawes@booking.com)
2023-12-12 11:20:41

*Thread Reply:* The only reason why I'm feeling the urge to change it, is because I did all this exploratory work in Kotlin, and I am loathe to change it to Groovy.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-12 11:30:10

*Thread Reply:* Go for it - one thing I'd ask is to make change in a separate PR

Damien Hawes (damien.hawes@booking.com)
2023-12-12 11:30:20

*Thread Reply:* Aye

Damien Hawes (damien.hawes@booking.com)
2023-12-12 11:35:13

*Thread Reply:* The plan I have is 4-phased approach.

  1. Add Scala 2.13 build support (1st PR), but still have the project publishing with Scala 2.12.
  2. Have @Mattia Bertorello use the above to build the project locally against Scala 2.12 and Scala 2.13 (2nd PR)
  3. Modify the project to build and publish Scala 2.12 and Scala 2.13 artefacts (3rd PR)
  4. (Optionally) Migrate to Kotlin DSL I don't want to convert my stuff to Groovy, but honestly, it'll be the smallest amount of changes, to what promises to be a fairly big PR (#1). Hence, I'll leave the Kotlin DSL migration to last.
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-12 11:46:08

*Thread Reply:* might be worthy to do Kotlin first then?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-12 11:46:29

*Thread Reply:* I mean, convert all the existing Groovy stuff and then just add on top of that

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-12 11:46:41

*Thread Reply:* But if you think it's easier to do it this way, then sure go for it

Damien Hawes (damien.hawes@booking.com)
2023-12-12 11:47:33

*Thread Reply:* Yeah - though I'm thinking in terms of not conflicting with Mattia too much, as he will have a ton of merge conflicts to resolve.

Damien Hawes (damien.hawes@booking.com)
2023-12-12 11:47:45

*Thread Reply:* Though, maybe it is best to get the pain out of the way sooner.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2023-12-12 11:07:11

btw guys, I don't know if you read my letter to santa clause, but more collaboration on spark code was definitely on the list 😉 thank you for coming and I am super happy to see you working on this 🎅

Damien Hawes (damien.hawes@booking.com)
2023-12-15 10:40:39

An update @Maciej Obuchowski @Paweł Leszczyński - the effort to support Scala 2.13 isn't small. Holy hell.

Damien Hawes (damien.hawes@booking.com)
2023-12-15 10:41:32

*Thread Reply:* The challenging bit is the coupling of the spark32, spark33, spark34 , and spark35 modules to the shared and spark3 modules.

Damien Hawes (damien.hawes@booking.com)
2023-12-15 10:42:56

*Thread Reply:* spark3 cannot be compiled with Scala 2.13, because, well, no spark dependencies were ever released for it with Scala 2.13

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2023-12-18 01:59:48

*Thread Reply:* would it help if we upgraded sparkVersion in spark3 to 3.2 for Scala 2.13? 3.2 should be able to support 2.13

Mattia Bertorello (mattia.bertorello@booking.com)
2023-12-21 05:54:03

*Thread Reply:* Hi Pawel,

I upgraded spark3 project to 3.2 version and also the shared project. Now they can support scala 2.12 and 2.13

If you want to have a look at the changes, there are a lot https://github.com/OpenLineage/OpenLineage/compare/main...mattiabertorello:feature/support_scala_2_13?expand=1

A summary of what I have done is: • Use the ScalaConversionUtils everywhere to hide the conversion that changes between Scala 2.12 and 2.13. And add some methods to avoid the use of inner classes like Seq$ and Map$ outside the class. • Upgrade to Spark 3.2 the spark3 project by temporarily copying the spark32 . On this part I would like your suggestion on how to handle it. • Upgrade shared project spark version from 2.4.8 to 3.2.2, same of spark3 and spark32 • Upgrade net.snowflake:spark-snowflake from 2.9.3 to 2.11.0 to support Scala 2.13 • Change from com.databricks:dbutils-api to com.databricks:databricks-dbutils-scala to support Scala 2.13 • Upgrade com.google.cloud.spark:spark-bigquery-with-dependencies from 0.26.0 to 0.29.0 because the new version supports Scala 2.13 • In Gradle files support an easy change between Scala versions with a property file. • In the Gradle app project and in every project, support a dynamic Scala version through a variable. Now, I need some help understanding how to move on to make the contribution and know if this error exists before. Scala module 2.12.3 requires Jackson Databind version >= 2.12.0 and < 2.13.0 Is it normal or not? When I run the tests, that also happens when I run gradle build on main.

Then now, I need to understand what are the next steps: • Which tests do I still need to do to say that Scala 2.13 is working? If I can run the CI, it would be helpful. • How can I start opening PRs to avoid merging everything in a single PR? What about start opening a PR to use the ScalaConversionUtils everywhere? • Then, the difficult parts: continue working on the Damien side to make Gradle generate multiple jars, each one with each Scala version. In general I still need clarification on how to do it. I'm happy to hear some suggestions 🙂 Thanks!

👀 Paweł Leszczyński
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2023-12-21 08:20:27

*Thread Reply:* Hi @Mattia Bertorello,

Great news. I've clicked "draft PR" #2329 just to see if ./gradlew clean build works fine for different Spark versions. We can close this PR later. Some of the integration tests require extra credentials (like access to big query), so they need to be triggered by us. Just to mention, so that you don't worry once they fail. CI starts automatically on new commit into the PR.

I really like your approach and I think we can already go with the separate PRs and steps: (1) use ScalaConversionUtils (2) make scala version parameterized and stored in gradle.properties although still equal to 2.12

Scala module 2.12.3 requires Jackson Databind version &gt;= 2.12.0 and &lt; 2.13.0 -> not sure of this. we do use jackson and databind in openlineage-java and shadow them within the package. However, we're using 2.15.3 which is quite recent. That's the recent build on main in CI -> https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/8702/workflows/f1aed08c-8333-4460-a08b-1ad746cb046b can you find the same log there?

Can we keep building shared conditionally and use spark 2.4.8 for scala 2.12 while 3.2 for scala 2.13. that's an extra complexity which, on the other hand, assures the content of the submodule can be really shared among different spark versions. although a single jar is published at the end, this makes sure our code compiles with different versions. i think the same, as with shared , can be done with spark3.

The scope of this work is quite big, so I would be happy to work iteratively on the PRs and merge them even though still working on final solution.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2023-12-21 12:55:30

*Thread Reply:* > Upgrade shared project spark version from 2.4.8 to 3.2.2, same of spark3 and spark32 This effectively means moving minimum supported Spark version to 3.2.2, right?

Mattia Bertorello (mattia.bertorello@booking.com)
2023-12-22 11:48:19

*Thread Reply:* I still need to check if the 2.4.8 version still works since I didn't touch it. but I ran some test to avoid changing the version in the shared project and I get this error. ```io.openlineage.spark.agent.lifecycle.LibraryTest

Test testRdd(Path, SparkSession) FAILED (2.3s)

java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.dependencies()Lscala/collection/Seq; at io.openlineage.spark.agent.lifecycle.Rdds.flattenRDDs(Rdds.java:34)`` Only when I ran the test with the scala2.13after update to3.2+all the spark3 and 3x projects otherwise the same code run with no issues with scala2.12` .

I still need to understand why is happing. I know that It's not great ditching the support for the 2.4 version

Mattia Bertorello (mattia.bertorello@booking.com)
2024-01-09 10:47:54

*Thread Reply:* Hi team!

Could you have a look to these PRs? https://github.com/OpenLineage/OpenLineage/pull/2357/ https://github.com/OpenLineage/OpenLineage/pull/2359 https://github.com/OpenLineage/OpenLineage/pull/2360

They are part of the effort to support Scala 2.13; for the moment, none of them changes any logic, just some code refactoring and a library update. We're still working on the Gradle side, which has some challenges.

Thanks!

👀 Maciej Obuchowski
Vladimir Fedotov (vladimir.fedotov@booking.com)
2023-12-18 04:45:08

@Vladimir Fedotov has joined the channel

Harel Shein (harel.shein@gmail.com)
2023-12-19 18:54:20

@Harel Shein has joined the channel

Teemu Mattelmäki (teemu.mattelmaki@teradata.com)
2024-01-05 05:34:59

@Teemu Mattelmäki has joined the channel

Teemu Mattelmäki (teemu.mattelmaki@teradata.com)
2024-01-05 05:34:59

Hello! I tried to find if OL Spark column level lineage is dependent on Databricks implementation. According to this link it feels that the answer is is YES. https://openlineage.io/docs/integrations/spark/quickstart_databricks Could you confirm that dependency?

openlineage.io
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-05 08:00:03

*Thread Reply:* What do you mean by dependent on Databricks implementation?

Databricks provides their own, closed source versions of some LogicalPlan for which visitors generally need to be implemented by reflection or by checking that we have the necessary classes, like here - but that does not mean integration for OSS Spark depends on some Databricks dependency

🙌 Paweł Leszczyński, Teemu Mattelmäki, Harel Shein
Teemu Mattelmäki (teemu.mattelmaki@teradata.com)
2024-01-05 08:59:18

*Thread Reply:* Hello @Maciej Obuchowski! I have a customer that has on-prem. Hadoop environment with Spark & Hive. We want to establish column level lineage on that environment. We would need to install OpenLineage (OL) Spark Agent. Does OpenLineage support this type of environment or does it only support Spark on Databricks?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-05 09:16:36

*Thread Reply:* > Does OpenLineage support this type of environment yes, you can try it in local notebook for example 🙂

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-05 09:17:04

*Thread Reply:* https://openlineage.io/docs/integrations/spark/quickstart_local

openlineage.io
Teemu Mattelmäki (teemu.mattelmaki@teradata.com)
2024-01-06 03:22:44

*Thread Reply:* Thanks! We'll try this option.

Damien Hawes (damien.hawes@booking.com)
2024-01-10 05:16:48

@Maciej Obuchowski @Paweł Leszczyński - Happy New Year. Just a note, we stumbled into a bit of roadblock with testing of the changes for Scala 2.13. Notably: Bitnami only publishes Spark images for Scala 2.12. This means the CI/CD pipeline will fail under Scala 2.13 conditions.

For example:

```SparkContainerIntegrationTest > testCreateTable() STANDARD_ERROR

24/01/10 09:54:01 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
java.lang.NoSuchMethodError: 'scala.collection.immutable.Seq org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.map(scala.Function1)'
    at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.lambda$buildInputDatasets$6(OpenLineageRunEventBuilder.java:342)
    at java.base/java.util.Optional.map(Optional.java:260)
    at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildInputDatasets(OpenLineageRunEventBuilder.java:340)
    at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.populateRun(OpenLineageRunEventBuilder.java:296)
    at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildRun(OpenLineageRunEventBuilder.java:280)
    at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildRun(OpenLineageRunEventBuilder.java:229)
    at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.start(SparkSQLExecutionContext.java:87)
    at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$sparkSQLExecStart$0(OpenLineageSparkListener.java:91)
    at java.base/java.util.Optional.ifPresent(Optional.java:178)
    at io.openlineage.spark.agent.OpenLineageSparkListener.sparkSQLExecStart(OpenLineageSparkListener.java:91)
    at io.openlineage.spark.agent.OpenLineageSparkListener.onOtherEvent(OpenLineageSparkListener.java:82)
24/01/10 09:54:01 INFO SparkContext: SparkContext is stopping with exitCode 0.

```

Damien Hawes (damien.hawes@booking.com)
2024-01-10 05:17:46

*Thread Reply:* This is because of a breaking change between Scala 2.12 and Scala 2.13 in the Scala Collections API.

Damien Hawes (damien.hawes@booking.com)
2024-01-10 05:18:22

*Thread Reply:* In Scala 2.12, Seq is an alias for scala.collection.Seq and in 2.13, Seq is an alias for scala.collection.immutable.Seq

Damien Hawes (damien.hawes@booking.com)
2024-01-10 05:23:59

*Thread Reply:* The question becomes, does OL (as an org), have a way to overcome this challenge? i.e., custom docker images?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 05:59:13

*Thread Reply:* even official Spark images don't have 2.13 version 😞 https://hub.docker.com/_/spark

hub.docker.com
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 06:06:20

*Thread Reply:* as for Seq issue, maybe the solution would be to have something similar to the visitor factories we have: provide two factories, make integration choose at runtime which one to use, and initialize Seq there?

Mattia Bertorello (mattia.bertorello@booking.com)
2024-01-10 06:24:12

*Thread Reply:* It seems we need to build a Docker image for Spark Scala 2.13

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 06:43:04

*Thread Reply:* there are even some examples but they mostly focus on pure Spark, not python one https://github.com/xebia-functional/spark-scala3-example/blob/main/docker/Dockerfile

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 06:43:27

*Thread Reply:* I think people who run PySpark generally don't care about scala version 🤔

Damien Hawes (damien.hawes@booking.com)
2024-01-10 06:43:39

*Thread Reply:* Indeed.

Damien Hawes (damien.hawes@booking.com)
2024-01-10 06:44:50

*Thread Reply:* The problem (for us) is that Scala 2.12's Jackson module has a security vulnerability in it, that was flagged. So our Data Platform team(s) had to mitigate. The mitigation strategy involved migrating to Scala 2.13.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 06:53:04

*Thread Reply:* looking at https://github.com/bitnami/containers/blob/main/bitnami/spark/3.5/debian-11/Dockerfile there does not seem to be very simple way to just switch it to 2.13

Damien Hawes (damien.hawes@booking.com)
2024-01-10 06:54:57

*Thread Reply:* Nope. You have to download the Spark binaries compiled with 2.13, and build an entirely new docker image

Damien Hawes (damien.hawes@booking.com)
2024-01-10 06:55:20

*Thread Reply:* or at least, download the binaries and overwrite everything at runtime

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 06:56:38

*Thread Reply:* tbh I can't even find where they specify Scala 2.12

Damien Hawes (damien.hawes@booking.com)
2024-01-10 06:56:54

*Thread Reply:* They don't. It's the default version for Spark 3.x+

Damien Hawes (damien.hawes@booking.com)
2024-01-10 06:57:12

*Thread Reply:* You have to explicitly select Scala 2.13 version from Spark's website

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 06:57:14

*Thread Reply:* maybe doing it from scratch is only option? but I don't know if it's that easy; I feel if it was easy to build and maintain there would be existing image

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-11 02:34:39

*Thread Reply:* This may be silly, but... what if we just turned off docker tests for scala 2.13? There are a lot of non-docker integration tests and I don't feel we can fix the whole world.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-11 05:23:35

*Thread Reply:* It will be very easy to break it later then, even if we test it manually at the start 🙂

Mattia Bertorello (mattia.bertorello@booking.com)
2024-01-11 05:25:50

*Thread Reply:* What about using the custom docker image suggested here only for scala 2.13 https://openlineage.slack.com/archives/C06A9EAUWUQ/p1704886984670229?thread_ts=1704881808.866719&cid=C06A9EAUWUQ and built at run time, no super efficient but better than nothing 🙂 new ImageFromDockerfile() .withFileFromPath("Dockerfile", Paths.get("./spark/docker/scala213/Dockerfile")) .withBuildArg("SPARK_VERSION", System.getProperty("spark.version")))

} Maciej Obuchowski (https://openlineage.slack.com/team/U01RA9B5GG2)
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-11 05:27:04

*Thread Reply:* Yeah might be good for now. I think we should see how long it takes

Mattia Bertorello (mattia.bertorello@booking.com)
2024-01-11 05:29:29

*Thread Reply:* For the moment we run it in local to test the changes. I will open a PR when we have all the stuff running.

👍 Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-01-10 07:11:15

@Maciej Obuchowski - starting a new thread here:

Regarding the factories comment

as for Seq issue, maybe the solution would be to have something similar to the visitor factories we have: provide two factories, make integration choose at runtime which one to use, and initialize Seq there? I'm unsure about your intended use of factories and their placement in the call stack. To illustrate the issue at hand, I developed a simple project. Here's an overview:

app project: ```public class Application { public static void main(String[] args) { Demonstration demonstration = new Demonstration(); java.util.List items = demonstration.items(); items.forEach(System.out::println); } }

public class Demonstration { public java.util.List items() { return convert(Provider.getStrings()); }

public java.util.List<String> convert(scala.collection.Seq<String> strings) {
    return scala.collection.JavaConverters.seqAsJavaList(strings);
}

public java.util.List<String> convert(scala.collection.immutable.Seq<String> strings) {
    return scala.collection.JavaConverters.seqAsJavaList(strings);
}

} **thing_2.12 & thing_2.13 projects** object Provider { def getStrings(): Seq[String] = Seq("a", "b", "c") }``` Explanation:

Compilation Against Scala 2.12 (thing2.12): When compiling the 'app' project against Scala 2.12, the Java compiler recognizes that it should use the method java.util.List<String> convert(scala.collection.Seq<String> strings). • Compilation Against Scala 2.13 (thing2.13): Conversely, when compiling against Scala 2.13, the compiler selects java.util.List<String> convert(scala.collection.immutable.Seq<String> strings) for dispatch. Runtime Behavior:

Matching Versions: Running the 'app' code compiled against thing_2.12 with the thing_2.12 jar in the classpath functions correctly. The same is true when pairing app compiled against thing_2.13 with the thing_2.13 jar. • Mismatched Versions: Issues arise when there's a mismatch in versions. For instance, using the app compiled against thing_2.12 but providing the thing_2.13 jar in the classpath leads to problems, and vice versa. This occurs because the JVM expects the method matching the compilation version, which it fails to find. Expanding on the Mismatched Versions topic:

Compile-Time Binding in Java:

• n Java, method overloading resolution is a compile-time decision. The compiler determines which method to call based on the method signatures and the types of the arguments as they are known at compile time. • In our case, the convert method is overloaded: one version takes a scala.collection.Seq<String>, and the other takes a scala.collection.immutable.Seq<String>. • When you compile against Scala 2.12, scala.collection.Seq is the prevalent sequence type. The Java compiler binds calls to convert to the method accepting scala.collection.Seq. • Conversely, compiling against Scala 2.13, where scala.collection.immutable.Seq is the base type, leads the Java compiler to bind to the method accepting scala.collection.immutable.Seq. Runtime Behavior and Classpath:

• At runtime, the Java Virtual Machine (JVM) expects to find the same class types and method signatures that were present during compilation. • If the runtime environment (classpaths and libraries) matches the compile-time environment, the application runs smoothly. • However, if there's a mismatch – for instance, the application is compiled against Scala 2.12 but run with Scala 2.13 in the classpath – the JVM may not find the expected method signatures. • This mismatch can lead to a NoSuchMethodError or similar runtime exceptions. The JVM looks for convert(scala.collection.Seq<String>) but only finds convert(scala.collection.immutable.Seq<String>), or vice versa.

👀 Maciej Obuchowski
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 07:32:35

*Thread Reply:* Thanks for the explanation!

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 07:32:48

*Thread Reply:* Yeah, I thought only initialization is an issue 🙂

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 07:40:02

*Thread Reply:* I can't think of better solution than reflection-based one right now

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 07:58:02

*Thread Reply:* @Damien Hawes Would something like this work? We have interface: public interface SeqConverter{ <T> scala.collection.Seq<T> convert(scala.collection.Seq<T> items); <T> scala.collection.Seq<T> convert(scala.collection.immutable.Seq<T> items); } and implementations for 2.12 and 2.13 within separate modules compiled with respective Scala versions - just as we compile modules with respective Spark versions now, example: ```public class Seq212Converter implements SeqConcerter { public scala.collection.Seq convert(scala.collection.Seq items) { return items; }

public scala.collection.Seq convert(scala.collection.immutable.Seq items) { // Whatever that conversion would look like return scala.collection.immutable.Seq$(items); } } then naively call Seq methods: public static Object callWithSeq(Object clazz, String method, scala.collection.Seq seq) { String Scala212Converter = "io.openlineage.spark.agent.scala212.Seq212Converter";

String Scala213Converter =
  "io.openlineage.spark.agent.scala213.Scala213Converter";

if ( scala.util.Properties$.MODULE$.versionNumberString().startsWith("2.12")) {
  SeqConverter converter = Class.forName(Scala212Converter).newInstance();
  return MethodUtils.invokeMethod(clazz, method, converter(seq));
} else {
  SeqConverter converter = Class.forName(Scala213Converter).newInstance();
  return MethodUtils.invokeMethod(clazz, method, converter(seq));
}

}``` The last part is definitely crude, as it allows you to just call a method with singular Seq argument, but it's just for sake of this discussion 🙂

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 07:59:32

*Thread Reply:* @Paweł Leszczyński when you get back take a look at this thread 🙂

👀 Paweł Leszczyński
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 08:02:43

*Thread Reply:* Seq&lt;Seq&lt;Seq&lt;Expression&gt;&gt;&gt; matched = (Seq&lt;Seq&lt;Seq&lt;Expression&gt;&gt;&gt;) mergeRows.getMethod("matchedOutputs").invoke(node); Seq&lt;Seq&lt;Expression&gt;&gt; notMatched = (Seq&lt;Seq&lt;Expression&gt;&gt;) mergeRows.getMethod("notMatchedOutputs").invoke(node); 🤔

Damien Hawes (damien.hawes@booking.com)
2024-01-10 10:14:27

*Thread Reply:* It could work. However, something to keep in mind: we have to eliminate all references to scala.collection.Seq in import statements and fully-qualified types. Basically it becomes a case of, "Immediately convert sequence to java.util.List".

Damien Hawes (damien.hawes@booking.com)
2024-01-10 10:14:50

*Thread Reply:* Otherwise we will face a combination of runtime and compile time errors.

Damien Hawes (damien.hawes@booking.com)
2024-01-10 10:15:29

*Thread Reply:* That code you linked, is one such place where we will need to change it - somehow.

Damien Hawes (damien.hawes@booking.com)
2024-01-10 10:15:42

*Thread Reply:* That, or we need to port that code to Scala.

Damien Hawes (damien.hawes@booking.com)
2024-01-10 10:16:05

*Thread Reply:* So that we don't need to constantly do a dance between Scala and Java collections.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 11:37:24

*Thread Reply:* We might go with writing common module in Scala though. It would be really good if we could do that incrementally. Even better if we could have single module for both 2.12 and 2.13

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-10 11:39:17

*Thread Reply:* We use a lot of Seq and I doubt it's the only place we'll have problems

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-11 02:42:37

*Thread Reply:* Would using this https://github.com/scala/scala-collection-compat help?

Stars
194
Language
Scala
Damien Hawes (damien.hawes@booking.com)
2024-01-11 03:55:06

*Thread Reply:* AS far as I could tell, no. That adds methods to the APIs

Damien Hawes (damien.hawes@booking.com)
2024-01-17 06:50:17

@Maciej Obuchowski @Paweł Leszczyński - spark3 what is its purpose? Is it meant to be common code for all Apache Spark 3.x.y variants?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-17 06:50:36

*Thread Reply:* yes

Damien Hawes (damien.hawes@booking.com)
2024-01-17 06:51:18

*Thread Reply:* I'm asking, because I noticed this situation, spark3 is compiled using Spark 3.1.x, but something like JdbcHandler doesn't exist in Spark 3.0.x

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-17 06:52:56

*Thread Reply:* if the classes using JdbcHandler aren't loaded, this shouldn't break the integration. However, we don't test our package with 3.0.x

Damien Hawes (damien.hawes@booking.com)
2024-01-18 09:50:41

@Maciej Obuchowski @Paweł Leszczyński - right, first prep PR for support Scala 2.13 is up. I'm breaking my changes up into smaller PRs to make then more consumable.

You'll find it here:

https://github.com/OpenLineage/OpenLineage/pull/2376

👀 Paweł Leszczyński, Maciej Obuchowski
🔥 Maciej Obuchowski, Harel Shein
Damien Hawes (damien.hawes@booking.com)
2024-01-18 10:13:37

*Thread Reply:* Ah, right. I can't run the integration tests.

Damien Hawes (damien.hawes@booking.com)
2024-01-18 10:14:00

*Thread Reply:* My heart sank a bit, when i saw the integration tests fail

Damien Hawes (damien.hawes@booking.com)
2024-01-18 10:14:21

*Thread Reply:* I was like, "How?! I didn't touch the spark code!"

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-18 10:21:57

*Thread Reply:* pushed it 🙂

Damien Hawes (damien.hawes@booking.com)
2024-01-18 10:28:51

*Thread Reply:* Sorry, pushed what?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-18 10:32:44

*Thread Reply:* We have a script to push external contributions to some origin branch, GH CI does apparently look at commit IDs instead of associating CI runs with branches so after this finishes: https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/8924/workflows/efdcc509-cc79-4158-a925-50a59733d446 it will be shown as CI results for your branch

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-18 10:33:12

*Thread Reply:* Anyway, we have different solution to this problem soon: https://github.com/OpenLineage/OpenLineage/pull/2374

Labels
ci, tests
Comments
3
Damien Hawes (damien.hawes@booking.com)
2024-01-18 10:53:47

*Thread Reply:* Perfect

Damien Hawes (damien.hawes@booking.com)
2024-01-18 10:54:02

*Thread Reply:* I see the tests went through

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-19 02:20:39

*Thread Reply:* It looks good to me and I approved PR. I would leave it unmerged for a moment to let Maciej review it if he likes to. We can merge it later this day.

BTW that's a really great improvement to the codebase @Damien Hawes. Thanks for preparing this.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-19 05:33:20

*Thread Reply:* Great PR!

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-19 05:34:44

*Thread Reply:* I've :gh_merged: it

Damien Hawes (damien.hawes@booking.com)
2024-01-19 05:39:05

*Thread Reply:* Thanks!

Arnab Bhattacharyya (arnab.bhattacharyya@booking.com)
2024-01-19 04:37:18

@Arnab Bhattacharyya has joined the channel

👋 Damien Hawes, Maciej Obuchowski, Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-01-19 05:30:38

@Maciej Obuchowski @Paweł Leszczyński - right the next PR is up for review: https://github.com/OpenLineage/OpenLineage/pull/2377

It builds off the previous one, so we see a lot more changes that what is strictly necessary. Once the previous one is merged, I'll rebase off of main.

Damien Hawes (damien.hawes@booking.com)
2024-01-19 06:17:32

*Thread Reply:* @Maciej Obuchowski - I see that the CI is failing because of environment variables not being set. Is there any way to retry the jobs to see if this is transient?

Damien Hawes (damien.hawes@booking.com)
2024-01-19 06:18:09

*Thread Reply:* ```ERROR: Missing environment variable {i}

Exited with code exit status 1```

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-19 06:36:20
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-19 06:38:09
Damien Hawes (damien.hawes@booking.com)
2024-01-19 07:25:30

*Thread Reply:* Awesome

Damien Hawes (damien.hawes@booking.com)
2024-01-19 07:25:34

*Thread Reply:* Seems like it went through

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-19 09:56:03

*Thread Reply:* :gh_merged:

Damien Hawes (damien.hawes@booking.com)
2024-01-19 10:05:19

*Thread Reply:* Awesome!

Damien Hawes (damien.hawes@booking.com)
2024-01-22 05:20:34

@Paweł Leszczyński - thanks for re-running the CI/CD

Damien Hawes (damien.hawes@booking.com)
2024-01-22 06:04:42

Thanks! @Paweł Leszczyński

Damien Hawes (damien.hawes@booking.com)
2024-01-22 06:53:30

@Paweł Leszczyński @Maciej Obuchowski - this one is ready: https://github.com/OpenLineage/OpenLineage/pull/2384

Damien Hawes (damien.hawes@booking.com)
2024-01-22 10:24:31

*Thread Reply:* @Paweł Leszczyński - responded to your comments.

👀 Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-01-22 06:53:53

Oh, I see the integration tests are failing with that environment variable thing

Damien Hawes (damien.hawes@booking.com)
2024-01-22 07:41:39

Done- the above PR is ready for review

👍 Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-01-22 12:40:45

Any idea why the CI/CD pipelines keep failing with that "No environment variable" issue?

Harel Shein (harel.shein@gmail.com)
2024-01-22 13:13:41

*Thread Reply:* hmmm... these come from the integration-tests context. which are available for all members of OpenLineage org. so perhaps you need to be a member for your build to have access to that context? @Jakub Dardziński might know more here

Harel Shein (harel.shein@gmail.com)
2024-01-22 13:14:47

*Thread Reply:* I know we didn't merge your PR yet @Jakub Dardziński

Jakub Dardziński (jakub.dardzinski@getindata.com)
2024-01-22 13:14:48

*Thread Reply:* well, so for that one I have this PR: https://github.com/OpenLineage/OpenLineage/pull/2374

it is indeed an issue that should be resolved with that

Harel Shein (harel.shein@gmail.com)
2024-01-22 13:15:22

*Thread Reply:* ok, great. let's test this out after this is merged then 🙂

Jakub Dardziński (jakub.dardzinski@getindata.com)
2024-01-22 13:15:40

*Thread Reply:* I was about to merge it today, forgot to do that after rebasing :face_palm:

😅 Harel Shein, Paweł Leszczyński
Jakub Dardziński (jakub.dardzinski@getindata.com)
2024-01-22 13:13:45

@Jakub Dardziński has joined the channel

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 03:38:09

@Damien Hawes https://github.com/OpenLineage/OpenLineage/pull/2384#discussion_r1462874134 -> please let me know what do you think about that?

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:40:03

*Thread Reply:* The problem exists in the MatchesMapRecursively class. It happens when k == "schema" . The code expects the type of target.get(k) to be a subtype of java.util.Map, however it is actually a subtype of java.util.ArrayList.

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:40:33

*Thread Reply:* public static Predicate&lt;Map&lt;String, Object&gt;&gt; predicate( Map&lt;String, Object&gt; target, Set&lt;String&gt; omittedKeys) { return (map) -&gt; { if (!map.keySet().containsAll(target.keySet())) { log.error("Object keys {} does not match target keys {}", map.keySet(), target.keySet()); return false; } for (String k : target.keySet()) { if (omittedKeys.contains(k)) { continue; } Object val = map.get(k); boolean eq; if (val instanceof Map) { eq = MatchesMapRecursively.predicate((Map&lt;String, Object&gt;) target.get(k), omittedKeys) .test((Map&lt;String, Object&gt;) val); } else if (val instanceof List) { eq = MatchesMapRecursively.predicate((List&lt;Object&gt;) target.get(k), omittedKeys) .test((List&lt;Object&gt;) val); } else if (PRODUCER_WITH_UNDERSCORE.equals(k) || PRODUCER.equals(k)) { eq = Versions.OPEN_LINEAGE_PRODUCER_URI.toString().equals(val); } else if (val == null) { eq = true; } else { eq = val.equals(target.get(k)); } if (!eq) { log.error( "For key {} - passed object {} does not match target object {}", k, map.get(k), target.get(k)); return false; } } return true; }; }

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:41:33

*Thread Reply:*

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 03:48:00

*Thread Reply:* it looks like, after changing bigquery connector version, some of the properties of BigQueryRelation changed their type. However, the test relies on serialized json https://github.com/OpenLineage/OpenLineage/blob/1.8.0/integration/spark/shared/src/test/resources/test_data/serde/bigqueryrelation-node.json

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:48:34

*Thread Reply:* So schema was always a list?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 03:48:35

*Thread Reply:* and this json may contain somewhere a list which is expected to be a map within new library

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:48:48

*Thread Reply:* it failed on the key "schema"

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:49:49

*Thread Reply:* Though, I just fixed the test.

🙌 Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:50:09

*Thread Reply:* Object val = map.get(k); Object targetVal = target.get(k); boolean eq; if (val instanceof Map) { if (targetVal instanceof Map) { Predicate&lt;Map&lt;String, Object&gt;&gt; predicate = predicate((Map&lt;String, Object&gt;) targetVal, omittedKeys); return predicate.test((Map&lt;String, Object&gt;) val); } else if (targetVal instanceof List) { Predicate&lt;List&lt;Object&gt;&gt; predicate = predicate((List&lt;Object&gt;) targetVal, omittedKeys); return predicate.test((List&lt;Object&gt;) val); } else { eq = false; } }

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:50:44

*Thread Reply:* For Scala 2.12 at least

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 03:50:53

*Thread Reply:* great

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:56:33

*Thread Reply:* OK. Here's a mess-with-your-head moment.

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:56:45

*Thread Reply:* The above change works.

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:57:02

*Thread Reply:* If you collapse it, into eq = predicate(...).test(...);

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:57:05

*Thread Reply:* the test fails

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:59:14

*Thread Reply:* Ah

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:59:16

*Thread Reply:* Nevermind

Damien Hawes (damien.hawes@booking.com)
2024-01-23 03:59:21

*Thread Reply:* I see I have changed the behaviour of the test

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 05:07:17

*Thread Reply:* in case you find it difficult or time consuming, you can remove this test i think. i don't consider this test in that form important

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:08:14

*Thread Reply:* It's just a bit tough, because I don't know what the bigquery stuff is supposed to output.

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:08:28

*Thread Reply:* So I don't know what the expected behaviour is.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 05:08:46

*Thread Reply:* i would try removing schema from json file and if it does not help we can remove the test

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:08:56

*Thread Reply:* I tried a different trick

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:09:00

*Thread Reply:* Namely

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:09:03

*Thread Reply:* Flatten the dictionary

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:09:13

*Thread Reply:* Yeah ...

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:09:28

*Thread Reply:* The actual results have a lot more properties than the expected

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:10:00

*Thread Reply:*

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:21:38

*Thread Reply:* @Paweł Leszczyński - I think a better approach would be:

  1. Keep the test disabled
  2. Explain why it is disabled (more than what I've done so far)
  3. Create an issue to fix the test, so it isn't lost to time.
Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:22:09

*Thread Reply:* What do you think?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 05:25:45

*Thread Reply:* > It's just a bit tough, because I don't know what the bigquery stuff is supposed to output. Last time I've bumped connector version I just serialized the bigquery node and partially replaced what was there previously... I think the test is bad conceptually because we depend on some internal structure of outside connector that we don't really care about

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 05:26:13

*Thread Reply:* I think we should remove it, and if we want a test to keep serialization stable we could try to serialize some fake node that we ourselves provide

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:26:18

*Thread Reply:* Yeah, that's what I found I was doing with my flattener

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 05:26:23

*Thread Reply:* but I think this is outside of the scope of this PR 🙂

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:26:29

*Thread Reply:* I was just adding an ever increasing amount of keys to omit

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:26:39

*Thread Reply:* and I just thought to myself, "If I am doing this, I'm doing something wrong"

😄 Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:27:19

*Thread Reply:* OK.

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:27:25

*Thread Reply:* I'll remove the test then.

👍 Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:28:59

*Thread Reply:* Just to clarify, you're arguing that the whole test of the LogicalPlanSerializer is flawed?

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:29:23

*Thread Reply:* i.e., we're serialising spark internal structures that could change without warning

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 05:31:25

*Thread Reply:* I think tests like testSerializeSlicesExcessivePayload are good - we provide our own implementation of LogicalPlan and we test the structure

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 05:32:06

*Thread Reply:* Even something like testSerializeInsertIntoHadoopPlan is less problematic as the InsertIntoHadoopFsRelationCommand is way more stable

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 05:32:13

*Thread Reply:* but BQ connector isn't...

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:55:20

*Thread Reply:* OK

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:55:25

*Thread Reply:* Changes pushed to all branches

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:29:55

*Thread Reply:* 🎉 'shared' was merged, thanks @Paweł Leszczyński

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:30:16

*Thread Reply:* :medal: awesome

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:30:26

*Thread Reply:* could u rebase other one on the list?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:30:39

*Thread Reply:* 2385 has some conflict

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:30:39

*Thread Reply:* It's odd that there are conflicts

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:30:50

*Thread Reply:* Because I had telescoping branches

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:30:56

*Thread Reply:* I created spark3 from shared

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:31:00

*Thread Reply:* spark31 from spark3, etc

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:31:10

*Thread Reply:* But sure, I'll rebase

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:37:58

*Thread Reply:* what if you rebased spark35 '2390' ?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:38:09

*Thread Reply:* perhaps it would be doable to merge it in one-go

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:38:40

*Thread Reply:* That last one is 2391

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:38:44

*Thread Reply:* the 'spark2'

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:38:59

*Thread Reply:* I'm seeing a failure on spark32

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:39:13

*Thread Reply:* Something to do with JAckson

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:39:33

*Thread Reply:* java.lang.ExceptionInInitializerError at io.openlineage.spark.agent.lifecycle.LibraryTest.testRdd(LibraryTest.java:112) Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.13.2 requires Jackson Databind version &gt;= 2.13.0 and &lt; 2.14.0 - Found jackson-databind version 2.15.3 at io.openlineage.spark.agent.lifecycle.LibraryTest.testRdd(LibraryTest.java:112)

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:39:41

*Thread Reply:* ok, i approved 2385 and triggered integration tests on that

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:40:38

*Thread Reply:* jackson things are always a problem

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:41:01

*Thread Reply:* we do have jackson packaged and shaded within openlineage-java

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:41:12

*Thread Reply:* Yeah - I'm trying to bisect where exactly things went wrong

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:41:27

*Thread Reply:* i.e., what changes did I make in spark32, that cause these jackson errors

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:42:21

*Thread Reply:* And I see 'spark31' is failing during integrationTests when run under spark 3.3.3

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:42:44

*Thread Reply:* sometimes rerun from failed simply helps

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:43:07

*Thread Reply:* I can't rerun

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:43:20

*Thread Reply:* integration tests depend on external resources like s3 buckets / big query tables and if you run multiple instances they can fail

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 06:43:51

*Thread Reply:* i can rerun it but anyway we will have to rebase each PR one by one and run tests again

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:44:04

*Thread Reply:* Indeed.

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:50:19

*Thread Reply:* Ah, I see the approval step now.

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:56:29

*Thread Reply:* > we do have jackson packaged and shaded within openlineage-java Hmm, when I dump the dependencies, I see raw Jackson dependencies. Not shaded ones.

Damien Hawes (damien.hawes@booking.com)
2024-01-23 06:57:41

*Thread Reply:* (The 2.13.2 migration is because I force Jackson to 2.13.2)

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 07:09:11

*Thread Reply:* sry, we relocate jackson within openlineage-spark https://github.com/OpenLineage/OpenLineage/blob/main/integration/spark/build.gradle#L153

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 07:18:56

*Thread Reply:* To expand on it more, there are two jacksons - one is version provided/required by particular Spark version, and other one used by our integration to serialize OpenLineage events. The second one needs to be shaded

Damien Hawes (damien.hawes@booking.com)
2024-01-23 07:19:56

*Thread Reply:* Hmm - it might be worth creating a module that has jackson shaded already

Damien Hawes (damien.hawes@booking.com)
2024-01-23 07:20:01

*Thread Reply:* ah

Damien Hawes (damien.hawes@booking.com)
2024-01-23 07:20:05

*Thread Reply:* but that won't work

Damien Hawes (damien.hawes@booking.com)
2024-01-23 07:20:11

*Thread Reply:* because of the openlineage-java thing

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 07:20:27

*Thread Reply:* https://github.com/OpenLineage/OpenLineage/pull/2386 -> @Damien Hawes could u rebase it?

Labels
documentation, integration/spark
Damien Hawes (damien.hawes@booking.com)
2024-01-23 07:21:31

*Thread Reply:* Done

👍 Paweł Leszczyński
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 07:21:38

*Thread Reply:* i think there was some jackson related hack that is necessary for serializing logical plan bcz we need to serialize logical plan's with jackson of the spark

Damien Hawes (damien.hawes@booking.com)
2024-01-23 07:21:55

*Thread Reply:* Yeah, the LogicalPlanSerializer

Damien Hawes (damien.hawes@booking.com)
2024-01-23 07:22:01

*Thread Reply:* does some reflection things

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 07:22:51

*Thread Reply:* if we want to serialize plan node in a given scala version, we need to run it with jackson of the same scala version

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-23 07:23:18

*Thread Reply:* but jackson attached to spark is missing jackson-databind (if i remember it right)

Damien Hawes (damien.hawes@booking.com)
2024-01-23 05:58:31

@Jakub Dardziński - ~how does the CI/CD pipeline now work? I see that the "run steps" aren't present when I push. Is this intended?~ Nevermind, it seems it just took a while before Circle CI spun them up and GitHub showed them as running.

Jakub Dardziński (jakub.dardzinski@getindata.com)
2024-01-23 06:10:11

*Thread Reply:* the change is that now integration tests from forked repos require approval

Damien Hawes (damien.hawes@booking.com)
2024-01-23 09:59:37

I don't understand. I ran the failing integration test of spark31 locally, it failed. I ran it again, it passed.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 10:09:45

*Thread Reply:* it's possible that spark has already cleared the QueryExecution from the executionIdToQueryExecution in the SQLExecution class if the job runs very fast

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 10:10:02

*Thread Reply:* (if the reason for failure was that START event was not send)

Damien Hawes (damien.hawes@booking.com)
2024-01-23 10:17:52

*Thread Reply:* Any idea how I debug the integration tests via intellij?

Damien Hawes (damien.hawes@booking.com)
2024-01-23 10:18:07

*Thread Reply:* Each time I try and run the ColumnLineageIntegrationTest it only runs the unit tests.

Damien Hawes (damien.hawes@booking.com)
2024-01-23 10:18:28

*Thread Reply:* I've told intellij to use the integrationTest task, but it's like, "nope"

Damien Hawes (damien.hawes@booking.com)
2024-01-23 10:18:44

*Thread Reply:* and just runs the unit tests any way

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-23 10:18:58

*Thread Reply:* Can you check if it's integrationTest in run configurations?

Damien Hawes (damien.hawes@booking.com)
2024-01-23 10:19:22

*Thread Reply:* This is what its running:

:app:integrationTest --tests "io.openlineage.spark.agent.ColumnLineageIntegrationTest" -Pspark.version=3.3.1

Damien Hawes (damien.hawes@booking.com)
2024-01-23 09:59:46

Send help.

😂 Harel Shein, Paweł Leszczyński, Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-01-23 10:08:04

OK - I found one:

all > io.openlineage.spark.agent > ColumnLineageIntegrationTest
columnLevelLineageSingleDestinationTest(
Damien Hawes (damien.hawes@booking.com)
2024-01-23 10:08:54
jayant joshi (itsjayantjoshi@gmail.com)
2024-01-24 01:10:44

@jayant joshi has joined the channel

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 02:41:46
Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:18:25

*Thread Reply:* Hmm

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:18:30

*Thread Reply:* Good hypothesis

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:36:38

*Thread Reply:* If I run ./gradlew app:test -Pspark.version=3.3.1

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:36:52

*Thread Reply:* I get this failure:

Execution failed for task ':app:compileJava'. &gt; Could not resolve all files for configuration ':app:compileClasspath'. &gt; Could not resolve com.fasterxml.jackson:jackson_bom:2.13.4.1. Required by: project :app &gt; org.apache.spark:spark_core_2.12:3.3.1 &gt; com.fasterxml.jackson.core:jackson_databind:2.13.4.1 &gt; Could not resolve com.fasterxml.jackson:jackson_bom:2.13.4.1. &gt; Could not parse POM <https://astronomer.jfrog.io/artifactory/maven-public-libs-snapshot/com/fasterxml/jackson/jackson-bom/2.13.4.1/jackson-bom-2.13.4.1.pom> &gt; Already seen doctype. &gt; Could not resolve com.fasterxml.jackson:jackson_bom:2.13.4.1. Required by: project :app &gt; org.apache.spark:spark_core_2.12:3.3.1 &gt; org.apache.avro:avro:1.11.0 &gt; com.fasterxml.jackson.core:jackson_core:2.13.4 project :app &gt; org.apache.spark:spark_core_2.12:3.3.1 &gt; org.apache.spark:spark_kvstore_2.12:3.3.1 &gt; com.fasterxml.jackson.core:jackson_annotations:2.13.4 &gt; Could not resolve com.fasterxml.jackson:jackson_bom:2.13.4.1. &gt; Could not parse POM <https://astronomer.jfrog.io/artifactory/maven-public-libs-snapshot/com/fasterxml/jackson/jackson-bom/2.13.4.1/jackson-bom-2.13.4.1.pom> &gt; Already seen doctype.

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:37:42

*Thread Reply:* spark-core requires 2.13.4.1

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:38:06

*Thread Reply:* Suggestion: bump the spark versions in the CI/CD configuration to the latest patch versions

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 03:40:37
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 03:40:45

*Thread Reply:* why do we have 3.3.1 ?

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:41:23

*Thread Reply:* We might not

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:41:28

*Thread Reply:* I had a look at the circle ci

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:41:31

*Thread Reply:* and I saw 3.3.3

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:41:39

*Thread Reply:* but I could have sworn I saw 3.3.1 in one of the tests

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 03:42:35

*Thread Reply:* 3.3.1 is in spark33

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 03:42:55

*Thread Reply:* and the module has some tests

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:49:28

*Thread Reply:* OK

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:49:33

*Thread Reply:* I'm going to push to spark35

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:49:40

*Thread Reply:* with changes to circle ci

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:49:51

*Thread Reply:* and also updating the gradle.properties for all branches that came before it

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:50:22

*Thread Reply:* along with removing the jackson override in app/build.gradle

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:50:41

*Thread Reply:* and if that passes, we can merge spark35 in, and take everything else with. Does that sound OK, @Paweł Leszczyński?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 03:50:55

*Thread Reply:* yes

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:55:31

*Thread Reply:* OK

Damien Hawes (damien.hawes@booking.com)
2024-01-24 03:55:34

*Thread Reply:* Pushed changes for 35

Damien Hawes (damien.hawes@booking.com)
2024-01-24 04:25:03

*Thread Reply:* Seems bitnami doesn't produce a 2.4.8 image

Damien Hawes (damien.hawes@booking.com)
2024-01-24 04:25:07

*Thread Reply:* Last is 2.4.6

Damien Hawes (damien.hawes@booking.com)
2024-01-24 04:25:12

*Thread Reply:* Reverting to 2.4.6

Damien Hawes (damien.hawes@booking.com)
2024-01-24 04:43:10

*Thread Reply:* @Paweł Leszczyński - could you approve the CI/CD pipeline for the spark integration test tasks?

👀 Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-01-24 04:43:16

*Thread Reply:* https://github.com/OpenLineage/OpenLineage/pull/2390

Labels
documentation, integration/spark, ci, tests
Damien Hawes (damien.hawes@booking.com)
2024-01-24 04:59:41

*Thread Reply:* > GoogleCloudIntegrationTest

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 05:00:23

*Thread Reply:* i know this

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 05:00:45

*Thread Reply:* it's @Maciej Obuchowski stuff :face_palm:

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 05:02:24

*Thread Reply:* let me work first on upgrading spark branches to latest patch versions

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 05:02:32

*Thread Reply:* will come back to you

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 05:47:38

*Thread Reply:* :gh_merged:

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 05:55:47

@Damien Hawes are the PRs for spark31-34 valid anymore? can you rebase spark2?

Damien Hawes (damien.hawes@booking.com)
2024-01-24 06:46:52

*Thread Reply:* No - those PRs can be closed, because spark35 built on top of 34, which built on top of 33, etc.

Damien Hawes (damien.hawes@booking.com)
2024-01-24 06:48:41

*Thread Reply:* Rebasing spark2 now

Damien Hawes (damien.hawes@booking.com)
2024-01-24 06:50:49

*Thread Reply:* OK. PRs are closed, with a link to 2390

Damien Hawes (damien.hawes@booking.com)
2024-01-24 06:56:14

*Thread Reply:* @Paweł Leszczyński - spark2 has been rebased and pushed.

👍 Paweł Leszczyński
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 07:09:11

*Thread Reply:* &gt; Task :app:compileTestJava FAILED

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:09:29

*Thread Reply:* 🤨

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 07:09:46

*Thread Reply:* i expected it to fail but it's kind of early

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:10:47

*Thread Reply:* Running it locally.

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:11:03

*Thread Reply:* Interesting.

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:11:32

*Thread Reply:* > Task :spark2:compileTestJava FAILED OptimizedCreateHiveTableAsSelectCommandVisitorTest.java:36: error: cannot find symbol import org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand; ^ symbol: class OptimizedCreateHiveTableAsSelectCommand location: package org.apache.spark.sql.hive.execution 1 error

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 07:11:52

*Thread Reply:* perhaps it's in 2.4.8

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 07:11:57

*Thread Reply:* was this changed?

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:12:10

*Thread Reply:* I reverted back to 2.4.6

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:12:15

*Thread Reply:* To match the spark image

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:12:21

*Thread Reply:* It was always 2.4.6

👍 Paweł Leszczyński
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 07:13:17

*Thread Reply:* wasn't it compiled with 2.4.8?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 07:13:36

*Thread Reply:* sparkVersion = '2.4.8' it was

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:13:46

*Thread Reply:* lol

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:13:53

*Thread Reply:* But integration test sits at 2.4.6

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:13:54

*Thread Reply:* Nice

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-24 07:14:00

*Thread Reply:* we compiled it with 2.4.8 but run integration tests based on 2.4.6

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:14:22

*Thread Reply:* Re-running with 2.4.8

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:15:07

*Thread Reply:* Yeah

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:15:13

*Thread Reply:* Things going through with 2.4.8

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:15:17

*Thread Reply:* Look at that

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:15:28

*Thread Reply:* Apache Spark introducing new features in patch versions

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:20:15

*Thread Reply:* @Paweł Leszczyński - pushed the changes, along with changes to the CI process

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:20:21

*Thread Reply:* Gradle will now use plain logs

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:20:30

*Thread Reply:* Which means it should be easier to see things

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:20:44

*Thread Reply:* i.e., I found it difficult to figure out which test is failing

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:20:48

*Thread Reply:* Unless it was near the bottom

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:41:35

*Thread Reply:* OK - I need to step out for a bit

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:41:43

*Thread Reply:* I'll handle this issue when I am back

Damien Hawes (damien.hawes@booking.com)
2024-01-24 07:41:51

*Thread Reply:* app:test fails under spark 2.4.6

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-24 08:40:30

*Thread Reply:* OptimizedCreateHiveTableAsSelectCommand was added in 2.4.7 or 2.4.8 AFAIK

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-24 08:43:31

*Thread Reply:* > we compiled it with 2.4.8 but run integration tests based on 2.4.6 The reason was lack of Docker image for 2.4.8, right?

Damien Hawes (damien.hawes@booking.com)
2024-01-24 10:51:26

*Thread Reply:* Yes

Damien Hawes (damien.hawes@booking.com)
2024-01-24 11:48:14

*Thread Reply:* OK - I've managed to fix the issue with app:compileTestJava

❤️ Maciej Obuchowski, Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-01-24 11:48:22

*Thread Reply:* and the subsequent failures that came afterwards.

Damien Hawes (damien.hawes@booking.com)
2024-01-24 12:34:26

*Thread Reply:* @Paweł Leszczyński - if you're able to, can you approve the integration tests for: https://github.com/OpenLineage/OpenLineage/pull/2391

Labels
documentation, integration/spark, ci
Comments
2
👀 Paweł Leszczyński
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-25 02:49:37

*Thread Reply:* there is still one test for 2.4.8 spark_kafka that is not passing Caused by: java.lang.NoSuchMethodError: org.apache.spark.internal.Logging.$init$(Lorg/apache/spark/internal/Logging;)V

Damien Hawes (damien.hawes@booking.com)
2024-01-25 08:46:50

*Thread Reply:* Yeah

Damien Hawes (damien.hawes@booking.com)
2024-01-25 08:47:13

*Thread Reply:* with @Mattia Bertorello’s help, we discovered the source of the problem

🚀 Mattia Bertorello
Damien Hawes (damien.hawes@booking.com)
2024-01-25 08:47:26

*Thread Reply:* It's a Scala binary problem.

Damien Hawes (damien.hawes@booking.com)
2024-01-25 08:47:55

*Thread Reply:* Bitnami's image is Scala 2.11 for 2.4.6

Damien Hawes (damien.hawes@booking.com)
2024-01-25 08:49:05

*Thread Reply:* We are pushing org.apache.spark:spark_sql_kafka_0_10_2.12:2.4.6 to the image

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-25 08:52:55

*Thread Reply:* can we use Scala 2.11 for 2.4? I think most people use 2.11 for it

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-25 08:53:19

*Thread Reply:* and I would really try to not push anything new there 🙂 if you use 2.4 your priority should be to upgrade

Damien Hawes (damien.hawes@booking.com)
2024-01-25 08:56:48

*Thread Reply:* Yeah - I reverted back to 2.11

👍 Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-01-25 10:15:38

*Thread Reply:* OK. Pushed the changes to the branch

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-25 10:37:10

*Thread Reply:* it failed 😞

Damien Hawes (damien.hawes@booking.com)
2024-01-25 11:56:08

*Thread Reply:* OK - https://github.com/OpenLineage/OpenLineage/pull/2391

Is waiting at the integration test phase

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-25 12:11:11

*Thread Reply:* approved 🙂

Damien Hawes (damien.hawes@booking.com)
2024-01-25 12:15:50

*Thread Reply:* Btw the airflow integration test is waiting for approval.

👀 Paweł Leszczyński
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-26 02:45:21

*Thread Reply:* :gh_merged:

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-25 12:11:44

@Damien Hawes is this last required PR? We're thinking about releasing soon to get this feature out fast

Damien Hawes (damien.hawes@booking.com)
2024-01-25 12:13:11

*Thread Reply:* Nope.

Damien Hawes (damien.hawes@booking.com)
2024-01-25 12:13:20

*Thread Reply:* There are a few more things to do. The 'app' module for instance needs to be migrated and that's a tough one, because it has integration tests that need to be 'ported'. For instance, Scala 2.13 variants of the integration tests.

Damien Hawes (damien.hawes@booking.com)
2024-01-25 12:15:28

*Thread Reply:* I don't feel comfortable saying, "Hey, let's release 2.12 and 2.13" without those checks and balances being in place.

👍 Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-01-26 05:52:16

@Paweł Leszczyński @Maciej Obuchowski - is it possible for OpenLineage to use the GitHub container registry, to host custom docker images, i.e., the Scala 2.13 variants of Apache Spark.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-26 06:00:04
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-26 06:00:21

*Thread Reply:* I've used quay.io for free hosting of docker images

Damien Hawes (damien.hawes@booking.com)
2024-01-26 06:22:00

*Thread Reply:* > • 500 MB GitHub Packages storage Wow.

1 spark image will blow that storage limit out of the water.

Damien Hawes (damien.hawes@booking.com)
2024-01-26 06:22:42

*Thread Reply:* Trust me, I know. I've built several Spark images before. They come in at like 1 GB - 1.5 GB per image.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-26 06:29:37

*Thread Reply:* they give 50gb in github enterprise. this would not make any sense...

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-01-26 06:34:03
Damien Hawes (damien.hawes@booking.com)
2024-01-26 06:34:45

*Thread Reply:* Aye - but what's the limit on the size of the packages? 🙂

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-26 06:38:17

*Thread Reply:* we might be able to use Google's container registry too https://cloud.google.com/artifact-registry

Google Cloud
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-26 06:38:48

*Thread Reply:* although I'm not sure our account has the permissions

Damien Hawes (damien.hawes@booking.com)
2024-01-29 05:24:48

@Paweł Leszczyński @Maciej Obuchowski - what's the possibility of temporarily dropping support for Spark 2.4.x? Reasoning:

We run integration tests against 2.4.x, 3.2.x, 3.3.x, 3.4.x, and 3.5.x. Of them 2.4 is the only one that doesn't support Scala 2.13.

It really complicates the build process of 'app'.

Furthermore, 'shared' is compiled using Spark 3.2.4, and it is the first version to support Scala 2.13.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-29 05:42:49

*Thread Reply:* I don't think it's possible for quite a bit of time still. There are people on, like EMR 5.X versions which uses 2.4.8 and is still supported, that are using OL

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-29 05:43:34

*Thread Reply:* I wish we could tho 🙂

Mattia Bertorello (mattia.bertorello@booking.com)
2024-01-30 10:06:21

Hi team, I opened a PR about start moving gradually the code of so called vendor like Snowflake, BigQuery, Iceberg, Delta, Databricks in to a separate project and using the service loader to load the objects base on the runtime availability of the vendor itself. I started with the easy one Snowflake and I already have a branch for Iceberg that is the one brings some issues during the Scala 2.13 support work.

So now I find out a similar effort to separate this code is in progress in another PR #2272 @Maciej Obuchowski @Paweł Leszczyński Do you know an ETA for this PR #2272 been release and if has exactly the same scope of mine? In general my idea was not changing too much the code to reduce too big PR and moving incrementally.

Mattia Bertorello (mattia.bertorello@booking.com)
2024-01-30 10:06:57

*Thread Reply:* Also I think there is some conflict with the @Damien Hawes works there.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-30 12:18:02

*Thread Reply:* I haven't fully reviewed your work, and I think your solution is really nice, but I think there are some significant differences between your approach @Mattia Bertorello and @Paweł Leszczyński’s one.

First, is that Paweł's interface is segregated from rest of OpenLineage-Spark integration, and OpenLineage-Spark depends on it. Second, is that we're trying to move the responsibility of maintaining lineage from the external DataSource implementations to those implementations - and we're getting good responses from for example BigQuery connector maintainer: https://github.com/OpenLineage/OpenLineage/issues/2349

I guess on the other way, Paweł's approach does not really solve your issue which is Scala compatibility and ease of compilation.

So I think we want to have both Paweł's approach as a public API, and your solution as a stop-gap to support the Scala 2.13 support as well? I guess to make it more precise - I would prefer to have Paweł's solution as a public API, while your as internal, not advertised one - I would prefer vendors to implement support in their own code, rather than provide us with implementation that would make us responsible for maintaining it.

What do you think?

Mattia Bertorello (mattia.bertorello@booking.com)
2024-01-31 08:21:37

*Thread Reply:* > I would prefer vendors to implement support in their own code, rather than provide us with implementation that would make us responsible for maintaining it. It makes sense to move the responsibility to them and the work in Paweł's PR; It's great to reach this goal.

Mine was more for the short term, and yes, it can be considered a first step towards that goal. In this way, we will already have all the code from the vendors separated into projects, and the only work is to adapt to the new interface without changing the core.

btw could you trigger the integration tests? Thanks! https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9118/workflows/cfb67fa1-db0e-4043-a73e-14de40968534

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-01-31 10:15:07

*Thread Reply:* Added a few comments to PR 🙂

:gratitude_thank_you: Mattia Bertorello
Mattia Bertorello (mattia.bertorello@booking.com)
2024-01-31 10:17:49

*Thread Reply:* Let me work on them in these days 🙂

Julien Le Dem (julien@apache.org)
2024-01-30 12:11:38

@Julien Le Dem has joined the channel

tati (tatiana.alchueyr@astronomer.io)
2024-01-30 12:13:13

@tati has joined the channel

Damien Hawes (damien.hawes@booking.com)
2024-02-02 08:39:32

@Paweł Leszczyński @Maciej Obuchowski - just curious, how certain are we that Spark 2 classes (CreateTableLikeCommand) are never created in Spark 3.2.x - Spark 3.5.x applications?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-02 08:43:12

*Thread Reply:* there's class io.openlineage.spark3.agent.lifecycle.plan.CreateTableLikeCommandVisitor in spark3

Damien Hawes (damien.hawes@booking.com)
2024-02-02 08:44:52

*Thread Reply:* Hmmm - so if the listener gets a CreateTableLikeCommand how does it know which CreateTableLikeCommandVisitor to delegate to?

Damien Hawes (damien.hawes@booking.com)
2024-02-02 08:44:54

*Thread Reply:* Oh wait.

Damien Hawes (damien.hawes@booking.com)
2024-02-02 08:44:56

*Thread Reply:* The factories.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-02 08:47:54

*Thread Reply:* 🙂

Damien Hawes (damien.hawes@booking.com)
2024-02-05 05:26:43

@Paweł Leszczyński - with the PySpark integration tests, does the presence of "RUNNING" events cause the assertions to fail or not?

I'm encountering a failure in the Kafka read write test, and I see additional running events, which makes me wonder what is being asserted for, except for the certain properties.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-05 05:27:29

*Thread Reply:* in most cases, we don't emit nor assert running events

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-05 05:27:56

*Thread Reply:* it makes more sense to check running for streaming jobs

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-05 05:29:46

*Thread Reply:* what's the test you have problem with?

Damien Hawes (damien.hawes@booking.com)
2024-02-05 05:39:36

*Thread Reply:* SparkContainerIntegrationTest#testPysparkKafkaReadWrite

Damien Hawes (damien.hawes@booking.com)
2024-02-05 05:39:49

*Thread Reply:* I removed the facets, to make the JSONs smaller

Damien Hawes (damien.hawes@booking.com)
2024-02-05 05:40:14

*Thread Reply:* and I see that the run events are missing their input and output datasets.

Damien Hawes (damien.hawes@booking.com)
2024-02-05 06:28:31

*Thread Reply:* Finally. I see the exception

Damien Hawes (damien.hawes@booking.com)
2024-02-05 06:30:58

*Thread Reply:* @Paweł Leszczyński have a look:

https://gist.github.com/d-m-h/95a8005063478c4844abd78f4eaf4e47

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-05 06:50:34

*Thread Reply:* why doesn't this go into if (KafkaRelationVisitor.isKafkaSource(command.dataSource())) condtition?

Damien Hawes (damien.hawes@booking.com)
2024-02-05 06:51:18

*Thread Reply:* I'm not sure. I tested with the original bitnami spark 3.2.4 scala 2.12

Damien Hawes (damien.hawes@booking.com)
2024-02-05 06:51:22

*Thread Reply:* and I get the same error

Damien Hawes (damien.hawes@booking.com)
2024-02-05 06:53:19

*Thread Reply:* I see that KafkaRelationVisitor is in the uber jar

Damien Hawes (damien.hawes@booking.com)
2024-02-05 06:53:28

*Thread Reply:* jar tvf openlineage-spark-app_2.13-1.9.0-SNAPSHOT.jar | grep Kafka 11382 Mon Feb 05 11:44:56 CET 2024 io/openlineage/spark/agent/lifecycle/plan/KafkaRelationVisitor.class 1739 Mon Jan 22 16:48:56 CET 2024 io/openlineage/client/transports/KafkaConfig.class 1626 Mon Jan 22 16:48:56 CET 2024 io/openlineage/client/transports/KafkaTransportBuilder.class 3594 Mon Jan 22 16:48:56 CET 2024 io/openlineage/client/transports/KafkaTransport.class

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:05:30

*Thread Reply:* Found the reason:

public static boolean hasKafkaClasses() { log.debug("Checking for Kafka classes"); try { KafkaRelationVisitor.class .getClassLoader() .loadClass("org.apache.spark.sql.kafka010.KafkaSourceProvider"); log.debug("Kafka classes found"); return true; } catch (Exception e) { log.debug("Kafka classes not found"); return false; } } 2024-02-05 12:04:11 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking for Kafka classes 2024-02-05 12:04:11 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Kafka classes not found

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:06:20

*Thread Reply:* It seems when the Kafka jars are loaded via --packages - we can't find the class.

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:14:50

*Thread Reply:* OK. I changed the check to this:

Thread.currentThread() .getContextClassLoader() .loadClass("org.apache.spark.sql.kafka010.KafkaSourceProvider");

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:15:32

*Thread Reply:* I see this:

2024-02-05 12:14:19 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking for Kafka classes 2024-02-05 12:14:19 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Kafka classes found 2024-02-05 12:14:19 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking for Kafka classes 2024-02-05 12:14:19 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Kafka classes found

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:16:07

*Thread Reply:* However:

java.lang.NoClassDefFoundError: org/apache/spark/sql/kafka010/KafkaSourceProvider at io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor.isKafkaSource(KafkaRelationVisitor.java:72) java.lang.NoClassDefFoundError: org/apache/spark/sql/kafka010/KafkaSourceProvider at io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor.isKafkaSource(KafkaRelationVisitor.java:72) at io.openlineage.spark.agent.lifecycle.plan.SaveIntoDataSourceCommandVisitor.apply(SaveIntoDataSourceCommandVisitor.java:97) at io.openlineage.spark.agent.lifecycle.plan.SaveIntoDataSourceCommandVisitor.apply(SaveIntoDataSourceCommandVisitor.java:46)

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-05 07:17:11

*Thread Reply:* recalling folks from microsoft had similar problem long time ago here -> https://github.com/OpenLineage/OpenLineage/blob/bba64fbddbea3557fc2342a3d2fbd4194b[…]penlineage/spark/agent/lifecycle/plan/KustoRelationVisitor.java

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:18:11

*Thread Reply:* lmao

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:18:15

*Thread Reply:* I was about to do just that

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:20:07

*Thread Reply:* Just for curiosity's sake:

public static boolean isKafkaSource(CreatableRelationProvider provider) { log.debug("Checking if provider is KafkaSourceProvider"); if (!hasKafkaClasses()) { return false; } log.debug("Checking if the provider is a KafkaSourceProvider. {}", provider.getClass().getCanonicalName()); boolean check = provider instanceof KafkaSourceProvider; log.debug("Is the provider a KafkaSourceProvider? {}", check); return check; }

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:20:31

*Thread Reply:* 2024-02-05 12:19:33 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking if provider is KafkaSourceProvider 2024-02-05 12:19:33 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking for Kafka classes 2024-02-05 12:19:33 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Kafka classes found 2024-02-05 12:19:33 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking if the provider is a KafkaSourceProvider. org.apache.spark.sql.kafka010.KafkaSourceProvider 2024-02-05 12:19:33 INFO io.openlineage.spark.agent.util.PlanUtils: apply method failed with java.lang.NoClassDefFoundError: org/apache/spark/sql/kafka010/KafkaSourceProvider

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:20:31

*Thread Reply:* Wild

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:20:43

*Thread Reply:* The class says that it is that instance

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:20:55

*Thread Reply:* but the instance check says "no, I'm not dealing with you"

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-05 07:37:09

*Thread Reply:* Different classloaders?

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:37:18

*Thread Reply:* Yup

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:37:32

*Thread Reply:* The main class loader loads the openlineage-spark jar

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:37:48

*Thread Reply:* and another class loader loads the kafka jars

Damien Hawes (damien.hawes@booking.com)
2024-02-05 07:38:13

*Thread Reply:* Thread.getCurrentThread().getContextClassLoader().loadClass(<name>) does the trick

👍 Maciej Obuchowski
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-05 07:39:17

*Thread Reply:* maybe we can generalize the solution? ClassloaderUtils.loadClass that tries to load it from both classloaders?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-05 07:39:42

*Thread Reply:* The idea is to not make next person debug the same issue with another connector 🙂

Damien Hawes (damien.hawes@booking.com)
2024-02-05 11:11:56

*Thread Reply:* Well - that was annoying.

Damien Hawes (damien.hawes@booking.com)
2024-02-05 11:12:05

*Thread Reply:* But, it seems like the kafka tests pass now

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-05 11:27:26

@Mattia Bertorello it's failing on spotless 🙂

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-05 11:29:15

*Thread Reply:* Yeah sorry, I apply some IntelliJ suggestions 😅

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-05 11:33:43

*Thread Reply:* Regarding the ServiceLoader problem, I implemented a static version with a classic class loader. My main concern is that we cannot ensure loose coupling between the vendor projects and the app/spark3 projects if we put the class name inside those. In this case, we need to be aware of the implementations.

Here the code ```public interface Vendors { List<String> VENDORS = Arrays.asList("io.openlineage.spark.agent.vendor.snowflake.SnowflakeVendor");

static Vendors getVendors() { ClassLoader cl = Thread.currentThread().getContextClassLoader();

List&lt;Vendor&gt; vendors =
    VENDORS.stream()
        .map(
            vendorClassName -&gt; {
              try {
                Class&lt;?&gt; vendor = cl.loadClass(vendorClassName);
                return (Vendor) vendor.newInstance();
              } catch (ClassNotFoundException
                  | InstantiationException
                  | IllegalAccessException e) {
                return null;
              }
            })
        .filter(Objects::nonNull)
        .filter(Vendor::isVendorAvailable)
        .collect(Collectors.toList());```
👍 Maciej Obuchowski
Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 06:48:04

*Thread Reply:* @Paweł Leszczyński Do you think this version is better? I can merge it so we can unblock the PR and maybe merge it. @Maciej Obuchowski do you think your comments are resolved, or is there something else to do?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-06 06:54:17

*Thread Reply:* I think mine are resolved 🙂

:gratitude_thank_you: Mattia Bertorello
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 07:27:40

*Thread Reply:* I lost current status of ServiceLoader discussion. I found thumbs-up in github under my comment and then some doubts about using class.forName . I got confused which is more recent and which direction is preferred to go? I am open for discussion. @Mattia Bertorello @Maciej Obuchowski

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 07:30:50

*Thread Reply:* No problem, having the ServiceLoader brings more loose coupling between the app project and others. But if you already found users that struggle with creating a uber-jar that keep/merge the META-inf files could be better to scarify the loose coupling.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 07:34:19

*Thread Reply:* https://github.com/OpenLineage/OpenLineage/issues/1860 -> this is an example of issue we have in our backlog. there are at least 3 different people involved so I assumed it's not that uncommon

Assignees
<a href="https://github.com/pawel-big-lebowski">@pawel-big-lebowski</a>
Labels
bug, integration/spark
Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 07:37:35

*Thread Reply:* ok, makes sense. So I can commit the change above that doesn't use the service loader and at the same time doesn't need the compile code be aware of the implementation since it happen at runtime and only through the interface. I can add this issues in the comments to justify the solution.

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 07:41:07

*Thread Reply:* BTW I'm working on having JobTypeJobFacet in Spark and then in DBT integration. Have you worked on this before?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 07:46:21

*Thread Reply:* I worked on that for Flink

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 07:47:42

*Thread Reply:* Yeah I'm checking that work to replicate on the spark integration 🙂

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 07:48:33

*Thread Reply:* it's not that super easy to determine if a job is batch or streaming

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 07:48:51

*Thread Reply:* Do we need also in spark?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 07:49:05

*Thread Reply:* it's possible to have streaming job in spark

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 07:49:38

*Thread Reply:* and processingType property is required within the facet

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-06 08:03:30

*Thread Reply:* Don't streaming jobs in Spark have different logical plans?

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 11:28:49

*Thread Reply:* It seems there is a isStreaming() method in the LogicalPlan https://github.com/OpenLineage/OpenLineage/pull/2410 Let me know what do you think 🙂

Labels
integration/spark
👍 Paweł Leszczyński, Maciej Obuchowski
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 11:29:24

*Thread Reply:* is the extra param in buildRun necessary?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 11:29:39

*Thread Reply:* why wouldn't this match in JobBuilder?

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 11:31:15

*Thread Reply:* Not really I can use use the jobBuilder and then extract the facets from there. I started with that implementation but then it wasn't so clean so I changed my mind

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 11:32:41

*Thread Reply:* But no problem to implement in that way. Because I need to build the jobBuilder to extract the facets that is not super great 😅 because then the code use the jobBuilder to overwrite the facets

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 11:34:27

*Thread Reply:* yeah, looks like the problem of nested builders

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 11:39:50

*Thread Reply:* thinking loadly - could we pass jobFacetsBuilder as a param to avoid adding each facet as param in future?

👍 Mattia Bertorello
Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 11:59:14

*Thread Reply:* Done

🎉 Paweł Leszczyński, Maciej Obuchowski
🙌 Paweł Leszczyński, Maciej Obuchowski
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 12:05:23

*Thread Reply:* could you add a line into changelog?

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-06 12:06:31

*Thread Reply:* Sure, but tomorrow, I'm heading home 🙂

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-06 12:06:49

*Thread Reply:* sure, giving an approval with a comment to fill changelog

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-07 04:41:07

*Thread Reply:* Done

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-07 02:18:41

@Mattia Bertorello PR 2405 is failin on VendorsTest &gt; testGetVendors() FAILED org.opentest4j.AssertionFailedError at VendorsTest.java:60

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-07 04:40:48

*Thread Reply:* Sorry, I forgot to fix the tests

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-07 05:39:15

*Thread Reply:* :gh_merged:

:gratitude_thank_you: Mattia Bertorello
Damien Hawes (damien.hawes@booking.com)
2024-02-07 03:46:11

@Paweł Leszczyński @Maciej Obuchowski - when you run

'./gradlwe :app:test' --tests "io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContextTest" -Pspark.version=3.2.4

Do these tests have a InvocationTargetException which is caused by a JsonMappingException which is caused by a StackOverflowError?

Damien Hawes (damien.hawes@booking.com)
2024-02-07 03:58:25

*Thread Reply:* I have this reference chain

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-07 05:07:04

*Thread Reply:* I got BUILD SUCCESSFUL in 55s on latest main

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-07 05:10:59

*Thread Reply:* build successful on my side, did you try rebuilding and republishing to local openlineage-java ?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-07 08:00:19

@Damien Hawes @Mattia Bertorello How far you from the ultimate goal? Will you need to rewrite all the extensions to vendors or snowflake only? Are you going to release multiple artifacts at the end per scala 2.12 and 2.13. Feeling kind of lost where are we at the moment.

Damien Hawes (damien.hawes@booking.com)
2024-02-07 08:07:42

*Thread Reply:* The idea is that there will be two artefacts.

Damien Hawes (damien.hawes@booking.com)
2024-02-07 08:07:57

*Thread Reply:* I'm working on the integration tests at the moment.

🙏 Maciej Obuchowski
Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-07 08:21:23

*Thread Reply:* I will continue working on the vendor extraction, now the next one is Iceberg. And not related to that I need to add the job type to Airflow and DBT.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-07 08:24:15

*Thread Reply:* what's the problem with iceberg? i thought is published for both: scala 2.12 and 2.13

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-07 09:45:40

*Thread Reply:* It will make it easier to support multiple versions of the same vendor, in this case, Iceberg, if there are breaking changes, including supporting the old version more easily without too much reflection, like if the code is mixed with the core spark code.

Damien Hawes (damien.hawes@booking.com)
2024-02-08 03:59:11

@Paweł Leszczyński - https://github.com/OpenLineage/OpenLineage/pull/2413

👀 Paweł Leszczyński
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-08 06:48:15

*Thread Reply:* that's... not a small pr 🙂

Damien Hawes (damien.hawes@booking.com)
2024-02-08 06:48:37

*Thread Reply:* Thankfully, it's just "add new stuff"

Damien Hawes (damien.hawes@booking.com)
2024-02-08 06:49:10

*Thread Reply:* It could be split into 3

Damien Hawes (damien.hawes@booking.com)
2024-02-08 06:49:38

*Thread Reply:* 1. Modification of the build.gradle files and the common config plugin

  1. Creation of Docker plugin
  2. Creation of Spark Builds plugin
Damien Hawes (damien.hawes@booking.com)
2024-02-08 07:12:48

*Thread Reply:* OK - splitting it up.

1st one:

Docker build plugin: https://github.com/OpenLineage/OpenLineage/pull/2414

(365 lines)

Damien Hawes (damien.hawes@booking.com)
2024-02-08 07:22:03

*Thread Reply:* 2nd one: https://github.com/OpenLineage/OpenLineage/pull/2415

@Maciej Obuchowski @Paweł Leszczyński

74 additions, 62 removals

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-08 07:24:20

*Thread Reply:* 👍

Damien Hawes (damien.hawes@booking.com)
2024-02-08 11:06:58

*Thread Reply:* 3rd one:

https://github.com/OpenLineage/OpenLineage/pull/2416

(The big one)

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-08 08:39:09

@Mattia Bertorello @Damien Hawes we are thinking about the OL release in next few days. Just wondering if we should wait for your PRs, or shall we release without waiting for this. Would love to hear your opinion on that. Keep in mind, a voting for an extra release can be initiated anytime you like in case you didn't want to wait the whole month.

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-08 08:48:44

*Thread Reply:* Thanks for letting us know. I would like to implement the JobType in Airflow; can you wait until Monday at the end of the day?

Damien Hawes (damien.hawes@booking.com)
2024-02-08 09:01:33

*Thread Reply:* From my side

Damien Hawes (damien.hawes@booking.com)
2024-02-08 09:01:36

*Thread Reply:* Don't wait for me.

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-08 12:58:55

*Thread Reply:* @Maciej Obuchowski I did the PR in the Airflow repository. https://github.com/apache/airflow/pull/37255 and fix the other one https://github.com/OpenLineage/OpenLineage/pull/2412/

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-09 07:25:45

*Thread Reply:* Thx for info

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-13 07:11:13

*Thread Reply:* Do you think we can merge it #2412 before the release? Is there some missing? @Maciej Obuchowski

Labels
documentation, integration/airflow
Comments
1
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-09 07:27:34

@Damien Hawes I merged 2413 and approved https://github.com/OpenLineage/OpenLineage/pull/2415 could u rebase it and resolve conflict?

Labels
integration/spark
Damien Hawes (damien.hawes@booking.com)
2024-02-12 08:07:29

@Paweł Leszczyński - this is ready

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-13 08:11:43

*Thread Reply:* I can't find SparkIcebergIntegrationTest being run in integration-test-integration-spark-scala-2_12-3.5.0 -> https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9275/workflows/78f45083-84f5-4379-b475-96ccececbbe5/jobs/173596

Damien Hawes (damien.hawes@booking.com)
2024-02-13 08:12:12

*Thread Reply:* 3.5.0 never had iceberg though

👍 Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-02-13 08:12:43

*Thread Reply:*

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-13 08:13:03

*Thread Reply:* integration tests run in 3mins30secs which is like way faster than it used to be

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-13 08:13:15

*Thread Reply:* i cannot find it for 3.4.2 as well

Damien Hawes (damien.hawes@booking.com)
2024-02-13 08:13:22

*Thread Reply:* hmm

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-13 08:17:17

*Thread Reply:* but it's the same on recent main I think. This is main build from 6 days ago https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9160/workflows/33a4d308-d0e6-4d75-a06b-7d8ef89bb1fe and SparkIcebergIntegrationTest is present there

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-13 08:17:49

*Thread Reply:* and execution time are above 10 mins

Damien Hawes (damien.hawes@booking.com)
2024-02-13 08:32:10

*Thread Reply:* I'll check, might be the logic for determining it is faulty

Damien Hawes (damien.hawes@booking.com)
2024-02-14 02:36:38

*Thread Reply:* @Paweł Leszczyński - any ideas why the Delta and Iceberg tests will execute and pass on my local device, but in the CI they fail?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 02:37:18

*Thread Reply:*

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 02:38:30

*Thread Reply:* in one of the previous PRs some Docker timeout got shortened I think.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 02:38:53

*Thread Reply:* My guess is that local environment is a way faster

Damien Hawes (damien.hawes@booking.com)
2024-02-14 02:39:09

*Thread Reply:* Yeah - it's hard without seeing the STDOUT logs

Damien Hawes (damien.hawes@booking.com)
2024-02-14 02:39:22

*Thread Reply:* Is there a way to see them, besides scrolling through the crazy logs?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 02:39:39

*Thread Reply:* like when you get the test report locally, I can see STDOUT and STDERR

Damien Hawes (damien.hawes@booking.com)
2024-02-14 02:39:58

*Thread Reply:* Otherwise, I have to go log diving 😂

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 02:41:32

*Thread Reply:* i think you can also ssh into circleCI and run single test

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 02:45:05

*Thread Reply:* i also search the logs for () FAILED which brings closer to real error but it's kind of log diving

Damien Hawes (damien.hawes@booking.com)
2024-02-14 03:05:54

*Thread Reply:* Then there is this one:

testExternalRDDWithS3Bucket(SparkSession)

ava.lang.UnsupportedClassVersionError: com/sun/istack/Pool has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:473) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at com.sun.xml.bind.v2.runtime.JAXBContextImpl$JAXBContextBuilder.build(JAXBContextImpl.java:1126) at com.sun.xml.bind.v2.ContextFactory.createContext(ContextFactory.java:135) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:247) at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:234) at javax.xml.bind.ContextFinder.find(ContextFinder.java:441) at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641) at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584) at com.amazonaws.util.Base64.&lt;clinit&gt;(Base64.java:52) at com.amazonaws.util.BinaryUtils.toBase64(BinaryUtils.java:59) at com.amazonaws.services.s3.AmazonS3Client.populateRequestHeaderWithMd5(AmazonS3Client.java:4547) at com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:2325) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$deleteObjects$16(S3AFileSystem.java:2785) It blows my mind that the Amazon stuff uses Java 8, but they declare a dependency that is compiled with Java 11.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 03:42:10

*Thread Reply:* Yeah - I don't know. I increased the timeout to 60 seconds and the CI still complains

Damien Hawes (damien.hawes@booking.com)
2024-02-14 03:51:50

*Thread Reply:* Any ideas @Paweł Leszczyński?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 03:52:01

*Thread Reply:* I'm at quite the loss

Damien Hawes (damien.hawes@booking.com)
2024-02-14 03:59:32

*Thread Reply:* The common theme amongst the failing tests, is that they all spin up local spark sessions.

👀 Paweł Leszczyński
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 07:06:47

*Thread Reply:* there is no events emitted nor error message, which is totally weird and can be mockserver related. I saw you changed mockserver ports to be above 10000. perhaps it's not allowed to use such high ports in circle ci. if a default http transport timeout is higher than timeout for mockserver assertion, this can look like that -> no error and assertion failed

Damien Hawes (damien.hawes@booking.com)
2024-02-14 07:07:28

*Thread Reply:* Yeah, there were two tests that had mock server ports set to 1081

Damien Hawes (damien.hawes@booking.com)
2024-02-14 07:07:35

*Thread Reply:* and they were running into each other

Damien Hawes (damien.hawes@booking.com)
2024-02-14 07:07:45

*Thread Reply:* so I was like, "Let's make them random"

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 07:07:59

*Thread Reply:* could u set it for delta to static value as before and see if they fail again?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 07:09:42

*Thread Reply:* sure

Damien Hawes (damien.hawes@booking.com)
2024-02-14 07:39:46

*Thread Reply:* OK

Damien Hawes (damien.hawes@booking.com)
2024-02-14 07:39:54

*Thread Reply:* Switching back to hard coded ports

Damien Hawes (damien.hawes@booking.com)
2024-02-14 07:40:31

*Thread Reply:* Made the 3.5.0 pipeline pass everything except the io.openlineage.spark.agent.SparkGenericIntegrationTest and the test that tries to use S3

Damien Hawes (damien.hawes@booking.com)
2024-02-14 07:40:50

*Thread Reply:* The former failed, once again, with the assertion error

Damien Hawes (damien.hawes@booking.com)
2024-02-14 07:40:56

*Thread Reply:* the latter, about the java version error

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 08:16:44

*Thread Reply:* @Maciej Obuchowski SparkGenericIntegrationTest is your recent test. You call getEventsEmitted and wait only as long as list is non-empty. So if you expect two events, you don't wait for the second one. Am I right?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 08:26:08

*Thread Reply:* TIL: If you relocate org.slf4j (which app was previously doing) - you will not see the logs of the shaded jar.

😨 Paweł Leszczyński
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 08:31:02

*Thread Reply:* yes, that's why I always comment that and @Paweł Leszczyński always asks why I've done it during review if I forget to revert it 🙂

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 08:31:46

*Thread Reply:* @Paweł Leszczyński I do verifyEvents before getEventsEmitted, that should take care of getting all the events I care about

🙌 Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-02-14 08:37:02

*Thread Reply:* Hmm - did I find an undocumented "feature"?

Basically, if the OpenLineage Java client detects that you're trying to post to /api/v1/namespaces/<namespace> it will inject the namespace into the run event under $.job.namespace if $.job.namespace is equal to default?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 10:04:59

*Thread Reply:* I'm seeing this in the logs for the integration test:

io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused) at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:128) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationEvent(OpenLineageSparkListener.java:321) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationEndEvent(OpenLineageSparkListener.java:329) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.onApplicationEnd(OpenLineageSparkListener.java:229) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:57) ~[spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) ~[spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) ~[spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) ~[spark-core_2.13-3.2.4.jar:3.2.4] at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) ~[iceberg-spark-runtime-3.2_2.13-0.14.0.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59) ~[iceberg-spark-runtime-3.2_2.13-0.14.0.jar:?] at <a href="http://org.apache.spark.scheduler.AsyncEventQueue.org">org.apache.spark.scheduler.AsyncEventQueue.org</a>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) ~[spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) ~[spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1433) [spark-core_2.13-3.2.4.jar:3.2.4] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) [spark-core_2.13-3.2.4.jar:3.2.4] Caused by: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused) at io.openlineage.spark.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.shaded.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.shaded.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:123) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] ... 20 more

Damien Hawes (damien.hawes@booking.com)
2024-02-14 10:06:27

*Thread Reply:* I'm honestly wondering if this is the source of my troubles with the integration tests failing. i.e., the mockserver just isn't binding to the port, or the port isn't accepting connections, or something.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 10:07:02

*Thread Reply:* That, for example, comes from the Google Cloud Integration test

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 10:34:34

*Thread Reply:* > Basically, if the OpenLineage Java client detects that you're trying to post to /api/v1/namespaces/&lt;namespace&gt; I don't think it sends to .../namespaces anywhere by default, the canonical endpoint is api/v1/lineage ? > I'm honestly wondering if this is the source of my troubles with the integration tests failing. i.e., the mockserver just isn't binding to the port, or the port isn't accepting connections, or something. Or the tests run before mockserver started accepting connections?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 10:36:26

*Thread Reply:* In Docker tests, we have a long timeout on the container static MockServerContainer makeMockServerContainer(Network network) { return new MockServerContainer(MOCKSERVER_IMAGE) .withNetwork(network) .withNetworkAliases("openlineageclient") .withStartupTimeout(Duration.of(2, ChronoUnit.MINUTES)); }

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 10:36:58

*Thread Reply:* but I don't think there's anything like that on local tests? mockServer = ClientAndServer.startClientAndServer(configuration, MOCKSERVER_PORT); mockServer .when(request("/api/v1/lineage")) .respond(org.mockserver.model.HttpResponse.response().withStatusCode(201));

Damien Hawes (damien.hawes@booking.com)
2024-02-14 11:31:58

*Thread Reply:* @Maciej Obuchowski - possible, possible. Nice thought.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 12:07:00

*Thread Reply:* Ran your branch locally and I'm getting the same errors

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:07:31

*Thread Reply:* What OS and arch are you using?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:09:48

*Thread Reply:* I'm using an Apple M1 with MacOS Sonoma

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 12:13:02

*Thread Reply:* pretty much the same, M1 Pro with Sonoma

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:13:25

*Thread Reply:* 🤨

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:16:19

*Thread Reply:* What JDK are you running the project with?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 12:18:47

*Thread Reply:* Azul Zulu 8 Arm build

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:18:58

*Thread Reply:* ¯_(ツ)_/¯

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:19:01

*Thread Reply:* Same.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:19:19

*Thread Reply:* I guess I just have some magic on my machine

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 12:19:44

*Thread Reply:* ➜ code java -version openjdk version "1.8.0_312" OpenJDK Runtime Environment (Zulu 8.58.0.13-CA-macos-aarch64) (build 1.8.0_312-b07) OpenJDK 64-Bit Server VM (Zulu 8.58.0.13-CA-macos-aarch64) (build 25.312-b07, mixed mode)

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 12:19:59

*Thread Reply:* I'll try to dig in more

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:20:18

*Thread Reply:* I have 392, but I don't think it makes much of a difference

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:21:25

*Thread Reply:* Also, I have access to a centos box running on x86_64, which I am running the tests with.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:21:35

*Thread Reply:* To see if I can replicate the failures there

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:26:59

*Thread Reply:* 2024-02-14 18:26:32 ERROR io.openlineage.spark.agent.EventEmitter - Could not emit lineage w/ exception io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused) at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:128) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.end(SparkSQLExecutionContext.java:141) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.sparkSQLExecEnd(OpenLineageSparkListener.java:100) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.onOtherEvent(OpenLineageSparkListener.java:86) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100) ~[spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) ~[spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) ~[spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) ~[spark-core_2.13-3.4.2.jar:3.4.2] at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) ~[scala-library-2.13.11.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59) ~[scala-library-2.13.11.jar:?] at <a href="http://org.apache.spark.scheduler.AsyncEventQueue.org">org.apache.spark.scheduler.AsyncEventQueue.org</a>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) ~[spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) ~[spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471) [spark-core_2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) [spark-core_2.13-3.4.2.jar:3.4.2] Caused by: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused) Got it on the linux box

🙂 Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:29:04

*Thread Reply:* ```parkDeltaIntegrationTest STANDARD_OUT 2024-02-14 18:26:07 WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:32801) could not be established. Broker may not be available. 2024-02-14 18:26:08 INFO io.openlineage.spark.agent.SparkDeltaIntegrationTest - Waiting for mock server to start on port 1082

SparkDeltaIntegrationTest > testMergeInto() STANDARDOUT 2024-02-14 18:26:08 ERROR io.openlineage.spark.agent.EventEmitter - Could not emit lineage w/ exception io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused) at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:128) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationEvent(OpenLineageSparkListener.java:321) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationStartEvent(OpenLineageSparkListener.java:325) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.onApplicationStart(OpenLineageSparkListener.java:247) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:55) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) ~[spark-core2.13-3.4.2.jar:3.4.2] at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) ~[scala-library-2.13.11.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59) ~[scala-library-2.13.11.jar:?] at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471) [spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) [spark-core2.13-3.4.2.jar:3.4.2]```

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:29:20

*Thread Reply:* So, the fact that the test proceeds, means that the mock server has started.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:29:49

*Thread Reply:* Because I have this

``` @Override public void beforeAll(ExtensionContext context) throws Exception { int port = basePort.getAndIncrement(); Configuration configuration = new Configuration(); configuration.logLevel(Level.ERROR); clientAndServer = ClientAndServer.startClientAndServer(configuration, port); clientAndServer .when(request("/api/v1/lineage")) .respond(HttpResponse.response().withStatusCode(201));

if (context.getTestClass().isPresent()) {
  Logger logger = LoggerFactory.getLogger(context.getTestClass().get());
  Awaitility.await("wait-for-mock-server-start-up")
      .atMost(Duration.ofMinutes(2))
      .pollInterval(Duration.ofSeconds(1))
      .until(
          () -&gt; {
            <a href="http://logger.info">logger.info</a>(
                "Waiting for mock server to start on port {}", clientAndServer.getPort());
            return clientAndServer.isRunning();
          });
}

}```

👍 Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:30:34

*Thread Reply:* So there's another reason why the connection is being refused.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:35:35

*Thread Reply:* https://github.com/mock-server/mockserver/issues/498

Assignees
<a href="https://github.com/jamesdbloom">@jamesdbloom</a>
Labels
need_more_information
Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:40:27

*Thread Reply:* What also has me confused, is that I see a lot of Kafka Admin Client related configurations.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:40:40

*Thread Reply:* Being unable to connect to a broker. Which is super strange.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-14 12:42:12

*Thread Reply:* I would try to see if this happens on the containers test in particular

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:44:00

*Thread Reply:* With the container test, I see that some of the tests are showing 404's

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:44:09

*Thread Reply:* But, they pass.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 12:44:13

*Thread Reply:* Which is strange, imo.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:43:23

*Thread Reply:* Further digging:

If I run one test class in isolation, the tests go through.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:43:39

*Thread Reply:* However, if I run multiple tests that require mock server, bad things happen.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:43:58

*Thread Reply:* i.e., the connection refused errors appear

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 13:44:18

*Thread Reply:* is this related to a shared mock server or shared spark session?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:44:47

*Thread Reply:* Are you asking about shared within the test class, or shared across test classes?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 13:45:01

*Thread Reply:* shared across test classes

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:45:16

*Thread Reply:* In that case, neither the spark session nor the mock server are shared.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:45:59

*Thread Reply:* For example, SparkDeltaIntegrationTest and SparkIcebergIntergrationTest

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:46:07

*Thread Reply:* They each get their own servers and sessions

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:46:19

*Thread Reply:* However, if you run them in isolation, they will individually pass

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:47:03

*Thread Reply:* But, if you run them in parallel or sequentially (but in the same JVM process), the one that executes after will fail.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 13:52:46

*Thread Reply:* that's weird. They can still share derby location or /tmp/whatver where data is stored

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:57:28

*Thread Reply:* For example: ./gradlew integrationTest -x test -x buildDockerImage -Pscala.binary.version=2.13 -Pspark.version=3.4.2 --tests SparkDeltaIntegrationTest --tests SparkGenericIntegrationTest --tests SparkIcebergIntegrationTest

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:57:58

*Thread Reply:* I started seeing the connection refused error, around the 2nd last test of the SparkDeltaIntegrationTest

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:58:18

*Thread Reply:* which makes me wonder if MockServer only wants 1 instance per JVM process

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:58:31

*Thread Reply:* Or are we hitting heapsize limits

Damien Hawes (damien.hawes@booking.com)
2024-02-14 13:58:49

*Thread Reply:* i.e., the JVM is running out of resources and that's why we're seeing connection refused

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 13:59:28

*Thread Reply:* mockserver is way to heavy for what it's doing. that's why maciej trully hates it

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 14:00:45

*Thread Reply:* you can reduce paralellism to 1 and see if tests suceed. this will take ages but at least we would this

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:01:00

*Thread Reply:* I think I already did that

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:01:46

*Thread Reply:* Oh no

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:01:50

*Thread Reply:* I removed the setting

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 14:02:43

*Thread Reply:* which one?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:02:57

*Thread Reply:* Fork every and max parallel

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:05:34

*Thread Reply:* Rnning the test with jvmArgs("-Xmx4096m", "-XX:MaxPermSize=512m") now

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:07:14

*Thread Reply:* Nope.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:07:17

*Thread Reply:* That did nothing.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:14:06

*Thread Reply:* OK. So I did something completely the opposite

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:14:11

*Thread Reply:* I increased the parallelism

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:14:23

*Thread Reply:* I said, max parallel forks = 5

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:14:30

*Thread Reply:* I had to change port assignment

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 14:14:38

*Thread Reply:* i run the command locally and have a lot of io.openlineage.client.OpenLineageClientException: <a href="http://io.openlineage.spark.shaded.org">io.openlineage.spark.shaded.org</a>.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:14:48

*Thread Reply:* Yup

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 14:14:52

*Thread Reply:* sth is stopping mockserver while other test tries to connect it

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:15:09

*Thread Reply:* I think MockServer isn't cleaning up after itself properly

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:15:38

*Thread Reply:* SUCCESS: Executed 26 tests in 1m 31s

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:18:26

*Thread Reply:* With:

tasks.register("integrationTest", Test.class) { group = "verification" dependsOn(integrationTestDependencies) testClassesDirs = testSourceSet.output.classesDirs classpath = files(tasks.shadowJar.outputs.files.singleFile, sourceSets.test.runtimeClasspath) useJUnitPlatform { includeTags("integration-test") excludeTags("databricks") logger.warn("[IntegrationTest] hasDeltaDependencies: ${hasDeltaDependencies(spark, scala)}") if (!hasDeltaDependencies(spark, scala)) { logger.warn("[IntegrationTest] Excluding delta tests") excludeTags("delta") } logger.warn("[IntegrationTest] hasIcebergDependencies: ${hasIcebergDependencies(spark, scala)}") if (!hasIcebergDependencies(spark, scala)) { logger.warn("[IntegrationTest] Excluding iceberg tests") excludeTags("iceberg") } } systemProperties.put("test.results.dir", buildDirectory.dir("test-results/${name}/o").get().asFile.absolutePath) setForkEvery(1) maxParallelForks = 5 setMaxHeapSize("1024m") }

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:18:55

*Thread Reply:* I get this:

SUCCESS: Executed 26 tests in 1m 32s

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 14:21:00

*Thread Reply:* I am also seeing java.lang.ClassCastException: org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier cannot be cast to org.apache.spark.sql.catalyst.analysis.ResolvedTable which don't make tests fail

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:21:18

*Thread Reply:* Yeah - that's something to do with Delta casting

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:21:39

*Thread Reply:* The code is trying to cast table to an identifier

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:21:51

*Thread Reply:* (I didn't touch that)

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 14:29:11

*Thread Reply:* hmyy, i thought resetting mockServer also clears the expectations

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 14:29:50

*Thread Reply:* and this

        .when(request("/api/v1/lineage"))
        .respond(HttpResponse.response().withStatusCode(201))

is also an expectation

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:38:46

*Thread Reply:* If it was an expectation

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:38:54

*Thread Reply:* wouldn't it clear after the first test?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 14:40:33

*Thread Reply:* ``` 2024-02-14 20:38:02 INFO io.openlineage.spark.agent.SparkDeltaIntegrationTest - Stopping mockserver on port 1923

2024-02-14 20:38:02 WARN org.apache.spark.sql.SparkSession - An existing Spark session exists as the active or default session. This probably means another suite leaked it. Attempting to stop it before continuing. This existing Spark session was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:1035) io.openlineage.spark.agent.SparkDeltaIntegrationTest.beforeEach(SparkDeltaIntegrationTest.java:111) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128) org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeEachMethod(TimeoutExtension.java:78) org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)

2024-02-14 20:38:02 INFO io.openlineage.spark.agent.SparkDeltaIntegrationTest - Stopping mockserver on port 1923

SparkGenericIntegrationTest STANDARD_OUT 2024-02-14 20:38:03 INFO io.openlineage.spark.agent.SparkGenericIntegrationTest - Waiting for mock server to start on port 1930

SparkGenericIntegrationTest > sparkEmitsApplicationLevelEvents() STANDARDOUT 2024-02-14 20:38:03 ERROR io.openlineage.spark.agent.EventEmitter - Could not emit lineage w/ exception io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.http.NoHttpResponseException: localhost:1923 failed to respond at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:128) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:60) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationEvent(OpenLineageSparkListener.java:321) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationStartEvent(OpenLineageSparkListener.java:325) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.onApplicationStart(OpenLineageSparkListener.java:247) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:55) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) ~[spark-core2.13-3.4.2.jar:3.4.2] at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) ~[scala-library-2.13.11.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59) ~[scala-library-2.13.11.jar:?] at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471) [spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) [spark-core2.13-3.4.2.jar:3.4.2] Caused by: io.openlineage.spark.shaded.org.apache.http.NoHttpResponseException: localhost:1923 failed to respond``` I added log to notify mockServer is stopped. I see it being stopped and then something tries to connect it.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:42:01

*Thread Reply:* Hmm - I find that really strange, that SparkGenericIntegrationTest, is trying to connect to SparkDeltaIntegrationTestt's mock server

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:42:20

*Thread Reply:* It should have a new mock server injected

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:42:39

*Thread Reply:* Unless that atomic integer isn't working as expected

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-14 14:42:43

*Thread Reply:* first, I changed atomicInteger to random but this did not help

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:42:52

*Thread Reply:* Yeah, I did the same.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:43:18

*Thread Reply:* Does this mean, MockServer is keeping state across tests?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:45:26

*Thread Reply:* ```State can be cleared from MockServer selectively:

use type to select which type of state to clear, supported values are: all, log, expectations use a request matcher to clear matching items use an expectation id to clear matching items```

Damien Hawes (damien.hawes@booking.com)
2024-02-14 14:46:06

*Thread Reply:* new MockServerClient("localhost", 1080).clear( request(), ClearType.LOG );

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:01:07

*Thread Reply:* So

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:01:17

*Thread Reply:* Using the official MockServerExtension

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:01:24

*Thread Reply:* Results in the same behaviour.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:05:44

*Thread Reply:* Yeah - I see it.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:06:04

*Thread Reply:* SparkDeltaIntegrationTest completes, and then SparkGenericIntegrationTest starts

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:06:24

*Thread Reply:* SparkGenericIntegrationTest is trying to connect to the same port as SparkDeltaIntegrationTest

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:14:11

*Thread Reply:* OK. Using a single JVM process

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:14:20

*Thread Reply:* I have managed to get the tests to run

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:14:32

*Thread Reply:* without them failing due to connection refused

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:14:53

*Thread Reply:* however, they are failing due to request matching

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:15:04

*Thread Reply:* I really don't think mockserver is thread safe

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:15:51

*Thread Reply:* This is what I've done to the tests:

```import org.mockserver.junit.jupiter.MockServerExtension; import org.mockserver.junit.jupiter.MockServerSettings;

@ExtendWith(MockServerExtension.class) @MockServerSettings(ports = {SparkIcebergIntegrationTest.MOCKSERVERPORT}, perTestSuite = true) public class SparkIcebergIntegrationTest { public static final int MOCKSERVERPORT = 1084;

public SparkIcebergIntegrationTest(ClientAndServer mockServer) { this.mockServer = mockServer; this.mockServer .when(request("/api/v1/lineage")) .respond(org.mockserver.model.HttpResponse.response().withStatusCode(201)); }

public void beforeEach() { mockServer.clear(request(), ClearType.LOG);

// stuff } }```

Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:19:43

*Thread Reply:* Pretty crazy though, that if you fork. Things work.

🙌 Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-02-14 15:25:13

*Thread Reply:* Btw, to get the official extension:

testImplementation("org.mock-server:mockserver_netty:5.14.0:shaded") { exclude group: 'com.google.guava', module: 'guava' exclude group: 'com.fasterxml.jackson.core' exclude group: 'com.fasterxml.jackson.datatype' exclude group: 'com.fasterxml.jackson.dataformat' exclude group: 'org.mock-server.mockserver-client-java' } testImplementation("org.mock-server:mockserver_junit_jupiter_no_dependencies:5.14.0")

Damien Hawes (damien.hawes@booking.com)
2024-02-14 16:31:31

*Thread Reply:* OK. I am happy to say that we've gotten the failures down to the same tests across all variants.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 16:32:20

*Thread Reply:* ```io.openlineage.spark.agent.GoogleCloudIntegrationTest > testRddWriteToBucket() > testReadAndWriteFromBigquery()

io.openlineage.spark.agent.lifecycle.SparkReadWriteIntegTest > testExternalRDDWithS3Bucket(SparkSession)```

Damien Hawes (damien.hawes@booking.com)
2024-02-14 16:32:37

*Thread Reply:* I will say, the last one (the class version error) is the one that I'm unsure how to solve.

Damien Hawes (damien.hawes@booking.com)
2024-02-14 16:33:34

*Thread Reply:* Perhaps something is bringing in the JDK 11 variant? Maybe we try and exclude the variant from the classpath?

Damien Hawes (damien.hawes@booking.com)
2024-02-14 17:40:19

*Thread Reply:* org.glassfish.jaxb:jaxb-runtime - yeah ... this one is the reason for the failure with the S3 test.

I removed spark-mllib during the migration, because I couldn't find any references to it. AS it turns out, the Google BigQuery stuff requires it. So I added it back, without excluding glassfish.

👍 Paweł Leszczyński
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 03:23:38

*Thread Reply:* I think I got It 😉

https://github.com/OpenLineage/OpenLineage/pull/2432/files#diff-5dc301670bf7398239b365b562a80e759ab1bb4f0e3f3d39617b0adedc061eedL106

Why are you removing OpenLineageSparkListener setting? This made the GoogleCloudIntegrationTest succeed on my end.

Damien Hawes (damien.hawes@booking.com)
2024-02-15 03:41:41

*Thread Reply:* Ah

Damien Hawes (damien.hawes@booking.com)
2024-02-15 03:41:47

*Thread Reply:* Good one

Damien Hawes (damien.hawes@booking.com)
2024-02-15 03:42:01

*Thread Reply:* I remember I was rearranging the args, to make sure the OL stuff was grouped together.

Damien Hawes (damien.hawes@booking.com)
2024-02-15 03:48:59

*Thread Reply:* Thanks for your help @Paweł Leszczyński

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:37:14

*Thread Reply:* Hmm - testExternalRddWithS3Bucket fails with:

Caused by: java.io.FileNotFoundException: No such file or directory: <s3a://openlineage-test/rdd_a_3.3.4/_temporary/0/task_202402150933081796269000648724168_0000_m_000001/part-00001-b5762fba-cb9a-401d-9f14-73cda8c6706c-c000.snappy.parquet>

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 04:46:34

*Thread Reply:* 2.12 integration tests are failing because of 404 on mockserver

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:47:03

*Thread Reply:* I just realised why the bigquery tests are failing

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:47:16

*Thread Reply:* Not because of the code, but because the fixtures are no longer valid

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:47:21

*Thread Reply:* err

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:47:22

*Thread Reply:* the events

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:47:43

*Thread Reply:* "name": "openlineage-ci.airflow_integration.{spark_version}_target" must be changed to "name": "openlineage-ci.airflow_integration.{spark_version}_{scala}_target"

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:47:49

*Thread Reply:* For all bigquery events

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:48:05

*Thread Reply:* Probably for the MetaStore2 and MetaStore3 events

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:52:44

*Thread Reply:* Regarding the 404:

https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9322/workflows/d03fc2fb-9a74-446d-83e8-cc69fb798dde/jobs/175615/tests

This one didn't experience those failures, but it is 3.5.0, so it may not have the same tests running

Damien Hawes (damien.hawes@booking.com)
2024-02-15 04:53:17
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 06:42:47

*Thread Reply:* pls run spotless, so we could see what's still failing

Damien Hawes (damien.hawes@booking.com)
2024-02-15 06:59:43

*Thread Reply:* Yeah

Damien Hawes (damien.hawes@booking.com)
2024-02-15 06:59:45

*Thread Reply:* Just did it

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 07:00:29

*Thread Reply:* I am off next week and would love to merge it this week 😉

Damien Hawes (damien.hawes@booking.com)
2024-02-15 07:00:43

*Thread Reply:* That makes two of us

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:00:13

*Thread Reply:* I need to recall what has Metastore test to do with google storage.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:00:21

*Thread Reply:* That's the failing test for now

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:00:29

*Thread Reply:* It uses it for storage of the files

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:00:54

*Thread Reply:* does it expect some content of the storage initially?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:01:07

*Thread Reply:* i don't get why it's suceeding for 3.5.0

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:02:17

*Thread Reply:* It succeeds for 3.4.2 and 3.5.0.

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:02:37

*Thread Reply:* And the 2.12 variant of 3.3.4

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:09:10

*Thread Reply:* logs are also flooded with io.openlineage.client.OpenLineageClientException: code: 404, response:

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:12:09

*Thread Reply:* THat's for the container tests

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:12:11

*Thread Reply:* afaik

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:12:24

*Thread Reply:* And I see that I haven't changed the mockserver container at all

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:12:31

*Thread Reply:* (compared with main)

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:12:40

*Thread Reply:* if 2.12 tests and 2.13 refer to the same warehouse path maybe they overwrite each other's data?

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:12:52

*Thread Reply:* I've changed that

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:12:53

*Thread Reply:* ➜ ~ gcloud storage ls <gs://openlineage-ci-testing/warehouse> <gs://openlineage-ci-testing/warehouse/3.1.3/> <gs://openlineage-ci-testing/warehouse/3.2.2/> <gs://openlineage-ci-testing/warehouse/3.2.4/> <gs://openlineage-ci-testing/warehouse/3.3.0/> <gs://openlineage-ci-testing/warehouse/3.3.1/> <gs://openlineage-ci-testing/warehouse/3.3.2/> <gs://openlineage-ci-testing/warehouse/3.3.3/> <gs://openlineage-ci-testing/warehouse/3.3.4/> <gs://openlineage-ci-testing/warehouse/3.4.0/> <gs://openlineage-ci-testing/warehouse/3.4.1/> <gs://openlineage-ci-testing/warehouse/3.4.2/> <gs://openlineage-ci-testing/warehouse/3.5.0/> <gs://openlineage-ci-testing/warehouse/no_iceberg_test.db/> <gs://openlineage-ci-testing/warehouse/spark-3_2_4/> <gs://openlineage-ci-testing/warehouse/spark-3_3_4/> <gs://openlineage-ci-testing/warehouse/spark-3_4_2/> <gs://openlineage-ci-testing/warehouse/spark-3_5_0/> I don't think your tests created any new paths there

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:13:22

*Thread Reply:* also the errors indicate that org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2) (ip-10-0-109-117.ec2.internal executor driver): org.apache.iceberg.exceptions.RuntimeIOException: Failed to get status for file: <gs://openlineage-ci-testing/warehouse/3.3.4/hive3/test/data/00000-0-b84bfcf2-3c44-47c6-9da9-c6e420db237c-00001.parquet>

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:13:29

*Thread Reply:* there's no scala version in the path

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:13:54

*Thread Reply:* java.io.IOException: Path: //openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_13/hive3/test_3_3_4_2_13/data is a directory, which is not supported by the record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:14:18

*Thread Reply:* org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: <gs://openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_13/hive3/test_3_3_4_2_13/metadata/00001-2862ac34-7a06-4b6a-8c13-b07446745975.metadata.json>

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:15:25

*Thread Reply:* ah it's has the underscores now

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:15:27

*Thread Reply:* ➜ ~ gcloud storage ls <gs://openlineage-ci-testing/warehouse/spark-3_3_4/> <gs://openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_12/> <gs://openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_13/>

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:16:30

*Thread Reply:* Yeah - I created my own GCP project

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:16:32

*Thread Reply:* I see, I was looking at the earlier failures

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:16:34

*Thread Reply:* Just to test this locally

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:21:39
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:22:14

*Thread Reply:* I looked at todays ci build on main and there are no 404 errors

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:24:17

*Thread Reply:* Caused by: java.io.FileNotFoundException: Item not found: 'gs://****************************-testing/warehouse/spark-3_3_4/scala-2_13/hive3/test_3_3_4_2_13/metadata/00001-0906d19b-1c0b-4e62-9cd5-b35a5410b0d8.metadata.json'. Note, it is possible that the live version is still available but the requested generation is deleted. this sounds to me like there is some concurrency

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:24:39

*Thread Reply:* it gets retried 4 times

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:24:45

*Thread Reply:* so chances are low

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:24:55

*Thread Reply:* to me it looks like something is not able to clear the path

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:25:08

*Thread Reply:* but why would it clear properly for other versions

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:25:20

*Thread Reply:* yeah so maybe it's red herring, it might fail at 2024-02-15 12:23:38 ERROR EventEmitter - Could not emit lineage w/ exception io.openlineage.client.OpenLineageClientException: code: 404, response: at io.openlineage.client.transports.HttpTransport.throwOnHttpError(HttpTransport.java:150) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] and then there is something leftover?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:25:49

*Thread Reply:* to me 404 are results of two tests using the same mockserver and resetting it

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:26:24

*Thread Reply:* yes, it might fail on that, then it retries and then fails earlier because of storage leftovers?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:26:51

*Thread Reply:* all the more reasons to get rid of mockserver 🙂

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:26:57

*Thread Reply:* one day 🙂

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:28:35

*Thread Reply:* this would make sense assuming retry wouldn't do @BeforeAll nor @AfterAll when failing

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:28:41

*Thread Reply:* Dude ... I wrote a "FileOutputExtension" to replace MockServer

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:28:45

*Thread Reply:* and then I deleted it

😅 Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:29:21

*Thread Reply:* I regret it now.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:29:22
Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:30:20

*Thread Reply:* @Damien Hawes could we change port of ColumnLineageIntegrationTest from 1080 to something else

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:30:21

*Thread Reply:* ?

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:30:26

*Thread Reply:* Sure

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:30:39

*Thread Reply:* 1090 sound good?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:31:00

*Thread Reply:* yes, as mockserver container is not using this

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:31:45

*Thread Reply:* @Maciej Obuchowski - yeah, I was using the file transport and then used ObjectMapper + JSONAssert from org.skyscreamer to perform a weak match on the expected and actual json

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:31:50

*Thread Reply:* It worked really well

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:31:52

*Thread Reply:* but I was like

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:31:56

*Thread Reply:* "Too much change"

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:31:59

*Thread Reply:* so I deleted it

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:36:01

*Thread Reply:* what a pity @Maciej Obuchowski;)

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:36:39

*Thread Reply:* yeah would be best as a separate PR 🙂

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:36:52

*Thread Reply:* maybe randomize the mockserver port?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:37:04

*Thread Reply:* 1/65536 is pretty low chance of collision

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:37:13

*Thread Reply:* I did that before

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:37:17

*Thread Reply:* I deleted that code

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:37:22

*Thread Reply:* BEcause turns out

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:37:29

*Thread Reply:* mockserver hates running in the same thread

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:37:32

*Thread Reply:* across multiple tests

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:37:39

*Thread Reply:* same JVM**

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 08:37:56

*Thread Reply:* one day we will rewrite it to rust, scala, github actions, non-mocksercer whatever but for now let's just juggle mockserver ports to make s.it work 😉

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:38:57

*Thread Reply:* Scala 🤮

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:39:00

*Thread Reply:* After this

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:39:08

*Thread Reply:* I never want to touch Scala again

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:39:27

*Thread Reply:* I actually wish we've started all of this in Scala, we'd not have to do any of it

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:40:04

*Thread Reply:* But even like 2 years ago we've thought it's too much work, and now we have 5x more code and related problems

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 08:40:13

*Thread Reply:* Sunken cost fallacy

Damien Hawes (damien.hawes@booking.com)
2024-02-15 08:40:18

*Thread Reply:* Yup

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:01:37

*Thread Reply:* WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/bitnami/spark/jars/spark-unsafe_2.12-3.2.4.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release Hmm, has bitnami upgraded the JDK on their images?

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:08:35

*Thread Reply:* @Paweł Leszczyński - when I run the column lineage test, in isolation, on my local

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:08:50

*Thread Reply:* 2024-02-15 15:07:56 WARN NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2024-02-15 15:07:58 ERROR EventEmitter - Could not emit lineage w/ exception io.openlineage.client.OpenLineageClientException: code: 404, response: at io.openlineage.client.transports.HttpTransport.throwOnHttpError(HttpTransport.java:150) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:124) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:?]

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:09:12

*Thread Reply:*

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:12:36

*Thread Reply:* I wonder

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:12:38

*Thread Reply:* I wonder

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 09:13:30

*Thread Reply:* The illegal reflective stuff should not actually break anything right now afaik

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:13:57

*Thread Reply:* @Paweł Leszczyński @Maciej Obuchowski

        .when(request("/api/v1/lineage"))
        .respond(org.mockserver.model.HttpResponse.response().withStatusCode(201))

However, when I examine debug logs and the code I see:

2024-02-15 15:11:12 INFO IcebergHandler - {spark.openlineage.transport.url=<http://localhost:1090/api/v1/namespaces/default>, .... .config( "spark.openlineage.transport.url", "<http://localhost>:" + MOCKSERVER_PORT + "/api/v1/namespaces/default")

👀 Maciej Obuchowski
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 09:16:01

*Thread Reply:* Ah, so you think it emits to the wrong port?

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:16:11

*Thread Reply:* No. To the wrong resource

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:16:18

*Thread Reply:* The error is 404

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 09:16:31

*Thread Reply:* Oh yes, why it is namespaces endpoint?

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:16:36

*Thread Reply:* I don't know

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:16:44

*Thread Reply:* I don't think it was me

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 09:16:55

*Thread Reply:* Yeah, just wondering

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:20:17

*Thread Reply:* I know

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:20:20

*Thread Reply:* I figured it out

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 09:20:25

*Thread Reply:* ?

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:20:32

*Thread Reply:* ``` public ColumnLineageIntegrationTest(ClientAndServer mockServer) { this.mockServer = mockServer; mockServer .when(request("/api/v1/lineage")) .respond(org.mockserver.model.HttpResponse.response().withStatusCode(201)); }

@SneakyThrows @BeforeAll public static void setup() { METASTORECONTAINER.start(); mappedPort = METASTORECONTAINER.getMappedPort(MetastoreTestUtils.POSTGRESPORT); spark = getSparkSession(); Arrays.asList("v2source1", "v2source_2") .forEach(e -> spark.sql("drop table if exists " + e)); getIcebergTable(spark, 1); getIcebergTable(spark, 2); databaseUrl = String.format("jdbc::%s/%s", mappedPort, database); }```

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:20:46

*Thread Reply:* The 404's are coming from the @BeforeAll lifecycle method

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:20:51

*Thread Reply:* MockServer hasn't been created or configured by then. MockServer is instance scoped, but the method is static scope.

Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:23:54

*Thread Reply:* So it's possible that via the extension, MockServer is instantiated and is listening on that port, but no expectations have been configured.

😬 Maciej Obuchowski
Damien Hawes (damien.hawes@booking.com)
2024-02-15 09:24:23

*Thread Reply:* So MockServer is like, "404 everything"

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-15 09:24:35

*Thread Reply:* yes, that's possible

Damien Hawes (damien.hawes@booking.com)
2024-02-15 11:32:11

*Thread Reply:* Oh - regarding the Scala conversation. It would require a change to the architecture of the solution.

We'd probably have to follow the route of the spline folks. Namely: 1 artifact per spark x scala version. In our case, we'd have to consider producing a Scala 2.11 build just for 2.4.8.

(I know this entire conversation is entirely hypothetical)

Damien Hawes (damien.hawes@booking.com)
2024-02-15 11:33:19

*Thread Reply:* 1. SparkContainerIntegrationTest is throwing 404s.

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-15 14:39:36

*Thread Reply:* I think any large effort like this would be more likely to target moving some of the code to Spark itself, or to enhance it's APIs to make developing this kind of integration easier

Damien Hawes (damien.hawes@booking.com)
2024-02-16 04:37:44

*Thread Reply:* More thoughts @Paweł Leszczyński @Maciej Obuchowski:

  1. Spark 3.5.0 doesn't fail on the Metastore tests, because it doesn't have iceberg, despite the fact that in metastore 2 we say no iceberg and metastore 3 we say yes to iceberg
Damien Hawes (damien.hawes@booking.com)
2024-02-16 04:38:42

*Thread Reply:* But I'm still at a loss for this: java.io.IOException: Path: //openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_12/hive3/test_3_3_4_2_12/data is a directory, which is not supported by the record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.

👀 Maciej Obuchowski, Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-02-16 04:56:58

*Thread Reply:* Oh wait, I see metastore 3 has an @Tag("iceberg") on its unit test

Damien Hawes (damien.hawes@booking.com)
2024-02-16 04:56:59

*Thread Reply:* nevermind

Damien Hawes (damien.hawes@booking.com)
2024-02-16 04:58:32

*Thread Reply:* Spark 3.5.0 won't run it

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-16 05:02:20

*Thread Reply:* java.io.IOException: Path: //openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_12/hive3/test_3_3_4_2_12/data is a directory, which is not supported by the record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false. that confuses me too

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:02:35

*Thread Reply:* Yup

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:11:32

*Thread Reply:* Wait

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:11:39

*Thread Reply:* What's the point of the Metastore tests?

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:11:57

*Thread Reply:* What are we testing there?

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 05:12:30

*Thread Reply:* interaction with datasets registered and accessed through Hive Metastore

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:12:35

*Thread Reply:* I mean, we're not evening loading the OpenLineageSparkListener

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:13:10

*Thread Reply:* I mean, this is what we're testing:

void IcebergTablesTest() { executeSql("create database if not exists %s", database); executeSql("drop table if exists %s.%s", database, table); executeSql( "create external table %s.%s (id int, value string) USING iceberg location '%s'", database, table, MetastoreTestUtils.getTableLocation(database, table)); executeSql("insert into table %s.%s VALUES (1, 'value1'), (2, 'value2')", database, table); Dataset&lt;Row&gt; rowDataset = executeSql(String.format("select ** from %s.%s", database, table)); List&lt;Row&gt; rows = rowDataset.collectAsList(); assertThat(rows).hasSize(2); assertThat(rows.get(0).get(0)).isEqualTo(1); }

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:13:58

*Thread Reply:* This is what the Spark Conf looks like:

``` public static SparkConf getCommonSparkConf( String appName, String metastoreName, int mappedPort, Boolean isIceberg) { SparkConf conf = new SparkConf() .setAppName(appName + VERSION) .setMaster("local[**]") .set("spark.driver.host", LOCALIP) .set("org.jpox.autoCreateSchema", "true") .set( "javax.jdo.option.ConnectionURL", String.format("jdbc::%d/%s", mappedPort, metastoreName)) .set("javax.jdo.option.ConnectionDriverName", "org.postgresql.Driver") .set("javax.jdo.option.ConnectionUserName", "hiveuser") .set("javax.jdo.option.ConnectionPassword", "password") .set("spark.sql.warehouse.dir", BASEPATH) .set("spark.hadoop.google.cloud.project.id", GOOGLESAPROPERTIES.get("projectid")) .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") .set( "fs.gs.auth.service.account.private.key.id", GOOGLESAPROPERTIES.get("privatekeyid")) .set("fs.gs.auth.service.account.private.key", GOOGLESAPROPERTIES.get("privatekey")) .set("fs.gs.auth.service.account.email", GOOGLESAPROPERTIES.get("client_email")) .set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") .set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");

if (isIceberg) {
  conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
      .set("spark.sql.catalog.spark_catalog.type", "hive")
      .set(
          "spark.sql.extensions",
          "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
}
return conf;

}```

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:14:40

*Thread Reply:* ^ that's on main

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:14:57

*Thread Reply:* So we're testing whether the hive metastore works on GCS?

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:15:35

*Thread Reply:* Which confuses me, because shouldn't that test be done in Hive's test suite and not ours?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-16 05:15:52

*Thread Reply:* ... 👀

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:18:40

*Thread Reply:* Unless there's a good reason for these tests, that don't actually test our code, I'm going to disable the tests. Do you agree @Paweł Leszczyński @Maciej Obuchowski?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-16 05:23:40

*Thread Reply:* I feel this is unfinished feature

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-16 05:27:29

*Thread Reply:* I feel like we can disable it

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-16 05:27:41

*Thread Reply:* but still don't understand why it fails, maybe it will always fail in 2.13?

Damien Hawes (damien.hawes@booking.com)
2024-02-16 05:45:22

*Thread Reply:* If I test it locally, under 3.2.4 and 2.13, it passes.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 06:03:56

*Thread Reply:* this looks like unfinished feature with a first PR to introduce spark+real hive metastore environment and other PRs never happened later. i don't know why it's not clearing the path. this can be related to a changed hadoop client dependencies. I think I don't mind turning test off and creating a separate issue to fix it later.

Damien Hawes (damien.hawes@booking.com)
2024-02-16 06:44:47

*Thread Reply:* And there we go

Damien Hawes (damien.hawes@booking.com)
2024-02-16 06:44:54

*Thread Reply:* All integration tests for Spark pass

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:05:01

*Thread Reply:* :gh_merged:

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:05:16

*Thread Reply:* Wait

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:05:20

*Thread Reply:* with 65 commits squashed?

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:05:40

*Thread Reply:* I was just about to squash them

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:06:16

*Thread Reply:* Including commits like, "Hello Spotless my old friend"

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:06:19

*Thread Reply:* xD

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:06:33

*Thread Reply:* sry 😉

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-16 07:06:41

*Thread Reply:* We have squash on merge set up

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:07:01

*Thread Reply:* Aye, but I didn't want this:

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:07:07

*Thread Reply:* I wanted to summarise my changes

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:07:08

*Thread Reply:* 🙂

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-16 07:07:37

*Thread Reply:* Let's fix this by adding changelog entry?

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:13:11

*Thread Reply:* Good point

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:18:41

*Thread Reply:* btw @Paweł Leszczyński - I had an idea of how to reduce the duplication in the CI config

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:18:49

*Thread Reply:* we can use YAML references/pointers

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:19:10

*Thread Reply:* And add a parameter for scala binary version

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:19:14

*Thread Reply:* I liked the idea of mocksever extension

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:19:46

*Thread Reply:* it got reverted, perhaps this was too many improvements at once but this was cool

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:19:58

*Thread Reply:* I can reintroduce it, because they publish their own extension

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:20:05

*Thread Reply:* which I used, after deleting my own

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:20:22

*Thread Reply:* But the tests do need to be adjusted though

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:20:46

*Thread Reply:* so that we don't rely on anything in static space

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:28:04

*Thread Reply:* My desired solution would be to write events to file with file transport and then verify events from that with no need for extra servers and ports. Maciej would prefer some lightweight server container to have a generic solution unrelated if it is java or python.

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:28:36

*Thread Reply:* Yeah - I want to go the file solution as well

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:28:38

*Thread Reply:* let's see who will be the first 😉

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:28:41

*Thread Reply:* It's less resource consumption

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:28:55

*Thread Reply:* It took me an hour to write it.

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:29:03

*Thread Reply:* Including the verification

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:29:29

*Thread Reply:* does it require to modify all the existing tests?

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:29:44

*Thread Reply:* It basically looked the exact same as the current mock server verifyEvents method

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:29:56

*Thread Reply:* The only difference was instead of passing in a mock server instance

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:30:11

*Thread Reply:* I passed in the file location (java.nio.file.Path)

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:30:36

*Thread Reply:* > does it require to modify all the existing tests? Regardless of which ever approach you take, you will have to modify the existing tests.

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:31:25

*Thread Reply:* yes, but do we need to modify assertion files with expected OL events

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:31:26

*Thread Reply:* If I were to redo it though, I would write it as an extension

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:31:39

*Thread Reply:* You mean with replacements?

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:31:47

*Thread Reply:* Because if yes, I already did that 😓

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:32:37

*Thread Reply:* can we reuse those files?

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:32:44

*Thread Reply:* Yes

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:32:58

*Thread Reply:* As I said, the only thing that changed in the method

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:33:04

*Thread Reply:* was the type of the first parameter

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:33:15

*Thread Reply:* it sounds cool to me

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:33:18

*Thread Reply:* ClientAndServer -> java.nio.file.Path

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:33:42

*Thread Reply:* Yeah but I deleted the code, because I was like "Too many changes"

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:33:50

*Thread Reply:* but it won't be hard to add it back

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:33:57

*Thread Reply:* I think

Paweł Leszczyński (pawel.leszczynski@getindata.com)
2024-02-16 07:33:58

*Thread Reply:* I wouldn't miss mockserver at all 😉

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:34:08

*Thread Reply:* unless I didn't commit it

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:40:15

*Thread Reply:* Yup

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:40:18

*Thread Reply:* I committed it

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:40:21

*Thread Reply:* managed to recover it

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:41:21

*Thread Reply:* This is what I had @Paweł Leszczyński

``` private static final ObjectReader READER = new ObjectMapper().findAndRegisterModules().reader();

static void verifyEvents(Path filePath, String... eventFiles) { verifyEvents(filePath, ignored -> true, Collections.emptyMap(), eventFiles); }

static void verifyEvents(Path filePath, Predicate<JsonNode> eventFilter, String... eventFiles) { verifyEvents(filePath, eventFilter, Collections.emptyMap(), eventFiles); }

static void verifyEvents(Path filePath, Map<String, String> replacements, String... eventFiles) { verifyEvents(filePath, ignored -> true, replacements, eventFiles); }

@SneakyThrows static List<OpenLineage.RunEvent> getEmittedEvents(Path filePath) { List<String> lines = Files.readAllLines(filePath); return lines.stream() .map(OpenLineageClientUtils::runEventFromJson) .collect(Collectors.toList()); }

@SneakyThrows static void verifyEvents( Path filePath, Predicate<JsonNode> eventFilter, Map<String, String> replacements, String... expectedEventFileNames) { Path eventsDir = Paths.get("integrations/container/");

final List&lt;JsonNode&gt; actualJsonObjects =
    Files.readAllLines(filePath).stream()
        .map(MockServerUtils::sneakyRead)
        .filter(eventFilter)
        .collect(Collectors.toList());

final List&lt;JsonNode&gt; expectedJsonObjects =
    Arrays.stream(expectedEventFileNames)
        .map(eventsDir::resolve)
        .map(MockServerUtils::sneakyRead)
        .map(json -&gt; doReplace(json, replacements))
        .map(MockServerUtils::sneakyRead)
        .collect(Collectors.toList());

IntStream.range(0, expectedJsonObjects.size())
    .forEach(
        index -&gt; {
          JsonNode actualJson = actualJsonObjects.get(index);
          JsonNode expectedJson = expectedJsonObjects.get(index);
          try {
            <a href="http://log.info">log.info</a>("Comparing {} to {}", expectedJson, actualJson);
            JSONAssert.assertEquals(expectedJson.toString(), actualJson.toString(), false);
          } catch (JSONException e) {
            Assertions.fail("Failed to match JSON", e);
          }
        });

}

@SneakyThrows private static JsonNode sneakyRead(String json) { return MockServerUtils.READER.readTree(json); }

@SneakyThrows private static String sneakyRead(Path path) { return new String(Files.readAllBytes(path)); }

private static String doReplace(String s, Map<String, String> replacements) { String result = s; for (Map.Entry<String, String> entry : replacements.entrySet()) { result = result.replace(entry.getKey(), entry.getValue()); } return result; }```

❤️ Paweł Leszczyński
Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:43:30

*Thread Reply:* Why was the predicate needed? Because MockServer can match certain events, for example, we only asked it to match on start and complete events, however the listener would publish running events.

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:44:05

*Thread Reply:* So the predicate can be used to omit running events before the comparison takes place

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-16 07:52:28

*Thread Reply:* you need to make sure you're not comparing stuff like timestamps or UUIDs that are generated for each run

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:52:39

*Thread Reply:* Aye

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:52:52

*Thread Reply:* That's this line:

JSONAssert.assertEquals(expectedJson.toString(), actualJson.toString(), false);

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-16 07:52:55

*Thread Reply:* On Airflow side we have solution with jinja templates, that's why I wanted to reuse that code

Damien Hawes (damien.hawes@booking.com)
2024-02-16 07:53:01

*Thread Reply:* The "false" means, don't do strict matching

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-13 08:10:19

Hey team! There is a high probability that we would need an expansion to the Kafka transport to support IAM authentication in MSK.

First, can I put it in the open lineage repository, or will we create an internal transport? Then, do you prefer some additional functionality at the already existing Kafka transport or a new one that extends the Kafka one? In the second one, we can separate the dependencies.

https://github.com/aws/aws-msk-iam-sasl-signer-python

Stars
16
Language
Python
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-13 08:33:22

*Thread Reply:* We'd prefer to not put additional dependencies as large as import boto3 import botocore.session to generic client that everyone using OpenLineage will download

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-13 08:35:18

*Thread Reply:* So what about a openlineage-python[msk] ?

Jakub Dardziński (jakub.dardzinski@getindata.com)
2024-02-13 09:01:45

*Thread Reply:* I’m ok with adding it as extra dependency with extending the existing Kafka transport

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-13 10:26:35

*Thread Reply:* I'm okay with openlineage-python[msk] too

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-13 10:28:04

*Thread Reply:* Ok perfect.

Julien Le Dem (julien@apache.org)
2024-02-13 19:44:13

*Thread Reply:* +1

Damien Hawes (damien.hawes@booking.com)
2024-02-19 04:08:30

@Paweł Leszczyński @Maciej Obuchowski - can I get this one reviewed: https://github.com/OpenLineage/OpenLineage/pull/2446

Labels
documentation, integration/spark, ci
Assignees
<a href="https://github.com/d-m-h">@d-m-h</a>
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-19 05:39:54

*Thread Reply:* I will do it. @Paweł Leszczyński is on vacation this week, so expect slower response time unfortunately

Damien Hawes (damien.hawes@booking.com)
2024-02-19 05:40:10

*Thread Reply:* Ah right, I remember now.

Damien Hawes (damien.hawes@booking.com)
2024-02-19 08:11:52

*Thread Reply:* https://github.com/OpenLineage/docs/pull/286

Assignees
<a href="https://github.com/d-m-h">@d-m-h</a>
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-19 09:46:55

@Damien Hawes @Mattia Bertorello is this ⬆️ and Iceberg PR enough for 2.13 support?

Damien Hawes (damien.hawes@booking.com)
2024-02-19 09:57:40

*Thread Reply:* I believe so

👍 Maciej Obuchowski
🎉 tati
Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-20 09:57:55

Hey! I would like to discuss this solution about DBT wrapper https://github.com/OpenLineage/OpenLineage/pull/2449 A stakeholder is concern about a possibile failure that can occur when emit the event from DBT event if the job is successful.

Do you think we can make of catching any exception a default one or we need to put a "feature flag" with an environment variable?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-20 10:18:44

*Thread Reply:* I think we can make it default

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-20 10:18:56

*Thread Reply:* It's not "breaking change", if anything it makes it more resilient

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-20 10:19:28

*Thread Reply:* I would change the <a href="http://logger.info">logger.info</a>(f"Emitted {len(events) + 2} openlineage events") to accurately count emitted events and display failed ones too

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-20 10:20:07

*Thread Reply:* Nice catch

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-20 10:20:45

*Thread Reply:* Obviously, in this case, it could happen that the job is successful, and there is no lineage.

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-20 10:24:46

*Thread Reply:* Done, I've done a emitted_events counter

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-23 04:39:20

*Thread Reply:* Hi Maciej, Could we merge this PR #2449 before release the 1.9.0? Do I still need to work on something?

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-23 04:46:39

*Thread Reply:* probably yes, sorry for not merging it earlier

Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-23 09:13:07

*Thread Reply:* @Mattia Bertorello it's merged 🙂

:gratitude_thank_you: Mattia Bertorello
Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-23 09:38:42

*Thread Reply:* Thanks 🙏

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-29 05:23:22

Hey team!

Another discussion: I created an MSK transport; let me know what you think. With this transport, OL users can use MSK with IAM authentication without defining a custom transport.

https://github.com/OpenLineage/OpenLineage/pull/2478

Labels
client/python
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-29 05:28:30

*Thread Reply:* would be great if you could confirm that you tested this manually

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-29 05:30:55

*Thread Reply:* I test it. I can show some screenshots 🙂 I have to create a small Python script, ship everything in a docker container and run it in a machine with network connectivity to MSK 😅

👍 Maciej Obuchowski
Maciej Obuchowski (maciej.obuchowski@getindata.com)
2024-02-29 05:32:40

*Thread Reply:* I believe you, it's just it would be too expensive time wise to have real integration tests for each of those transports, so we have to rely on people manually testing it 🙂

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-29 05:39:51

*Thread Reply:* Yeah you need an AWS account, some terraform code to create and destroy the MSK plus the integration test to run inside the VPC network 😅

But It's makes sense to put some screenshot in the PR just to show that was tested and how.

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-29 05:40:18

*Thread Reply:* The only test is the IAM auth because other than that is normal Kafka

👍 Maciej Obuchowski
Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-29 07:20:41

*Thread Reply:* test code ```import datetime import uuid

from openlineage.client import OpenLineageClient from openlineage.client.run import Job, Run, RunEvent, RunState from openlineage.client.transport import MSKIAMTransport from openlineage.client.transport.msk_iam import MSKIAMConfig

if name == "main": import logging

logging.basicConfig(level=logging.DEBUG)
config = MSKIAMConfig(
    config={
        "bootstrap.servers": "b-2.xxx.c2.kafka.eu-west-2.amazonaws.com:9098,b_1.xxx.c2.kafka.eu_west_2.amazonaws.com:9098"
    },
    topic="my_test_topic",
    region="eu-west-2",
    flush=True,
)
transport = MSKIAMTransport(config)
client = OpenLineageClient(transport=transport)
event = RunEvent(
    eventType=RunState.START,
    eventTime=datetime.datetime.now().isoformat(),
    run=Run(runId=str(uuid.uuid4())),
    job=Job(namespace="kafka", name="test"),
    producer="prod",
    schemaURL="schema/RunEvent",
)

client.emit(event)
client.transport.producer.flush(timeout=1)
print("Messages sent")```

Logs DEBUG:openlineage.client.transport.kafka:BRKMAIN [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Enter main broker thread 2024-02-29T12:14:47.560285672Z DEBUG:openlineage.client.transport.kafka:CONNECT [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Received CONNECT op 2024-02-29T12:14:47.560288447Z DEBUG:openlineage.client.transport.kafka:STATE [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Broker changed state INIT -> TRY_CONNECT 2024-02-29T12:14:47.560291862Z DEBUG:openlineage.client.transport.kafka:BROADCAST [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: Broadcasting state change 2024-02-29T12:14:47.560294645Z DEBUG:openlineage.client.transport.kafka:TOPIC [rdkafka#producer-1] [thrd:app]: New local topic: my_test_topic 2024-02-29T12:14:47.560297342Z DEBUG:openlineage.client.transport.kafka:TOPPARNEW [rdkafka#producer-1] [thrd:app]: NEW my_test_topic [-1] 0x5598e047bbf0 refcnt 0x5598e047bc80 (at rd_kafka_topic_new0:472) 2024_02_29T12:14:47.560300475Z DEBUG:openlineage.client.transport.kafka:BRKMAIN [rdkafka#producer-1] [thrd:app]: Waking up waiting broker threads after setting OAUTHBEARER token 2024-02-29T12:14:47.560303259Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Wake-up: OAUTHBEARER token update 2024-02-29T12:14:47.560306334Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: Wake-up sent to 1 broker thread in state >= TRY_CONNECT: OAUTHBEARER token update 2024-02-29T12:14:47.560309239Z DEBUG:openlineage.client.transport.kafka:CONNECT [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: broker in state TRY_CONNECT connecting 2024-02-29T12:14:47.560312101Z DEBUG:openlineage.client.transport.kafka:STATE [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Broker changed state TRY_CONNECT -> CONNECT ... DEBUG:openlineage.client.transport.kafka:PRODUCE [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: my_test_topic [0]: Produce MessageSet with 1 message(s) (349 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed) 2024-02-29T12:14:48.326364842Z DEBUG:openlineage.client.transport.kafka:SEND [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: Sent ProduceRequest (v7, 454 bytes @ 0, CorrId 5) 2024-02-29T12:14:48.382471756Z DEBUG:openlineage.client.transport.kafka:RECV [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: Received ProduceResponse (v7, 102 bytes, CorrId 5, rtt 55.99ms) 2024-02-29T12:14:48.382517219Z DEBUG:openlineage.client.transport.kafka:MSGSET [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: my_test_topic [0]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered 2024-02-29T12:14:48.382623532Z DEBUG:openlineage.client.transport.kafka:Send message <cimpl.Message object at 0x7fb116fcde40> 2024-02-29T12:14:48.382648622Z DEBUG:openlineage.client.transport.kafka:Amount of messages left in Kafka buffers after flush 0 2024-02-29T12:14:48.382730647Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: Wake-up: flushing 2024-02-29T12:14:48.382747018Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: Wake-up sent to 1 broker thread in state >= UP: flushing 2024-02-29T12:14:48.382752798Z Messages sent

👍 Maciej Obuchowski
Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-29 08:39:50

*Thread Reply:* I copied from the Kafka transport https://github.com/OpenLineage/OpenLineage/pull/2478#discussion_r1507361123 and It makes sense because otherwise when python read all the file could import a library that doesn't exist in case you don't need it.

Mattia Bertorello (mattia.bertorello@booking.com)
2024-02-29 08:42:43

*Thread Reply:* Also It think it's better to drop the support for IMDSv1 and in any case I should implement the IMDSv2 😅 to be complete https://github.com/OpenLineage/OpenLineage/pull/2478#discussion_r1507359486

Mattia Bertorello (mattia.bertorello@booking.com)
2024-03-05 04:10:45

*Thread Reply:* Hi @Kacper Muda, Is there still something to do in this PR?

Rajat (rajat.movaliya@atlan.com)
2024-03-18 09:52:32

@Rajat has joined the channel