@Paweł Leszczyński has joined the channel
@Mattia Bertorello has joined the channel
@Paweł Leszczyński - meet @Mattia Bertorello - one of my team members. He will be helping us to support Scala 2.13 with the Spark integration. For clarity, I am going to share the messages we exchanged earlier today in order to get him up to speed.
> Damien Hawes: Yeah - the cross-building of the spark integration isn't fun. I'm doing a toy project at the moment, to understand how to do it with Gradle. So far I can get a regular project to publish to my local maven repository with Scala 2.12 and Scala 2.13 > Paweł Leszczyński yeah, i think we should have extra scalaVersion param as we have sparkVersion and publish two artifacts with scala suffix at the end > Damien Hawes: Yup. The standard Scala suffixes. Though, I'm trying to wrap my head around how we currently get the entire integration to "come together" so to speak. I do have a concern though: if we publish two artifacts, what happens when someone runs a Spark 3.1 pipeline for example which only supports Scala 2.12 but we have Scala 2.13 code in there as well because of the way we "combine" all the sub-projects into 1 artifact > Paweł Leszczyński: in circle ci there is a matrix of params and we should be able somehow to prevent publishing spark2 + scala2.13 thats' the version matrix file https://github.com/OpenLineage/OpenLineage/blob/main/.circleci/workflows/openlineage-spark.yml > Damien Hawes So far I have this in my settings.gradle.kts of my toy project: ```rootProject.name = "openlineage-spark"
val scalaBinaryVersion = settings.extra["scalaVersion"] .toString() .split(".") .take(2) .joinToString(".")
include("openlineage-spark-core")
if (scalaBinaryVersion == "2.12") { include("openlineage-spark-spark30") include("openlineage-spark-spark31") include("openlineage-spark-spark32") include("openlineage-spark-spark33") include("openlineage-spark-spark34") include("openlineage-spark-spark35") }
if (scalaBinaryVersion == "2.13") {
include("openlineage-spark-spark32")
include("openlineage-spark-spark32")
include("openlineage-spark-spark33")
include("openlineage-spark-spark34")
include("openlineage-spark-spark35")
}
> Damien Hawes: Then in my gradle.properties I have this:
# This is used as the default Scala version
scalaVersion=2.12.18
sparkVersion=3.2.1```
@Maciej Obuchowski has joined the channel
Hello. I've been trying to balance of developer experience versus publishing experience versus user experience.
There are several approaches we could take.
Approach #1
Produce and publish an artefact per version of Spark x Scala. Given the table below, this would mean Spark 3.2.x, 3.3.x, 3.4.x, and 3.5.x would see two artefacts produced, something like io.openlineage:openlineage_spark_3.2_2.12:$VERSION
& io.openlineage:openlineage_spark_3.2_2.13:$VERSION
| Spark Version | Scala 2.10 | Scala 2.11 | Scala 2.12 | Scala 2.13 |
+---------------+------------+------------+------------+------------+
| 2.2.x | ✓ | ✓ | | |
+---------------+------------+------------+------------+------------+
| 2.3.x | | ✓ | | |
+---------------+------------+------------+------------+------------+
| 2.4.x | | ✓ | ✓ | |
+---------------+------------+------------+------------+------------+
| 3.0.x | | | ✓ | |
+---------------+------------+------------+------------+------------+
| 3.1.x | | | ✓ | |
+---------------+------------+------------+------------+------------+
| 3.2.x | | | ✓ | ✓ |
+---------------+------------+------------+------------+------------+
| 3.3.x | | | ✓ | ✓ |
+---------------+------------+------------+------------+------------+
| 3.4.x | | | ✓ | ✓ |
+---------------+------------+------------+------------+------------+
| 3.5.x | | | ✓ | ✓ |
+---------------+------------+------------+------------+------------
Benefits:
We don't include code from other versions of Spark into the published JAR. Drawbacks:
The developer experience becomes a bit more difficult, as it becomes necessary to edit the root gradle.properties
file to select the correct versions.
Produce an artefact per Scala major version. This means, something like: Scala 2.12 would include support for Spark 2.4, Spark 3.0, Spark 3.1, Spark 3.2, Spark 3.3, Spark 3.4, and Spark 3.5.
Benefits:
The developer experienced is kept roughly the same, activating Scala 2.13 would deactivate 3.1 and older from being considered in the Gradle build. Drawbacks:
We may end up including Scala 2.12 code with Scala 2.13 artefacts and vice versa.
Now there is another elephant in the room, switching to Scala 2.13 is not an easy feat. According to @Mattia Bertorello it breaks some code, where exactly, I'm not so sure. Perhaps he can shed light on this when he has the time.
What do you folks think? @Paweł Leszczyński @Maciej Obuchowski
*Thread Reply:* I would definitely gravitate for option 2, given that we already have infrastructure build to publish one artifact for all Spark versions we support for a single Scala version.
Could you elaborate on this, as I'm not sure why would it happen
> We may end up including Scala 2.12 code with Scala 2.13 artefacts and vice versa.
Also:
> More artefacts to publish.
Seems like less, only openlineage-spark_2.12
and openlineage_spark_2.13
- and possibly later versions of Scala, rather than full Spark x Scala matrix 🙂
> According to @Mattia Bertorello it breaks some code, where exactly, I'm not so sure. Perhaps he can shed light on this when he has the time. Agreed, this is a challenge. My first idea would be to have just two different classes conditionally compiled, similar to what we're doing now with particular Spark versions - although the problem might be that we'll have way too many internal subprojects... not an easy challenge and probably requires research - if it's just few classes then probably okay, if we need to have two versions of pretty much every class it's a no go.
*Thread Reply:* Yeah, I know the Spline folks did some magic with runtime code generation. But I think the amount of code they generated was small, and used for json serialization purposes.
> Could you elaborate on this, as I'm not sure why would it happen
>> We may end up including Scala 2.12 code with Scala 2.13 artefacts and vice versa.
>
When I first starting playing around with cross-building my toy project, I noticed that Gradle was building all the projects, i.e., there was a JAR being generated for each when I ran ./gradlew clean build
even if I ran ./gradlew clean build -PscalaVersion=2.12
.
However, I have made some changes to the settings.gradle.kts
file (the one you see earlier in the channel), and it seems to be working as expected, i.e., running the same command as specified above, deactivates the unused projects. Though, IntelliJ hates this, and red squiggly lines are displayed on those projects.
*Thread Reply:* Oh right, now I remember the other concern I had.
core
is compiled against a specific version of Spark, for the sake of development. The concern I have, is if that interface (or the objects it relies on) changes. So far, we haven't seen it yet - thankfully.
*Thread Reply:* Hi @Maciej Obuchowski I did some work to make it compile with the Scala 2.13 version and see how many changes are necessary. Until now, this version still has some issues and is not compiling.
Then, I considered using this library scala-collection-compat
to allow the cross-compilation, but I still need to test it.
Most of the changes done in scala 2.13 that impact OpenLineage are:
• JavaConverters were removed
• Use the immutable data type from the collection library
• Some types of issues with the Seq Builder
My next steps are to try to compile these changes in 2.12 and fix the issues that are still there in the 2.13
Here is my code, and it is still in the WIP https://github.com/OpenLineage/OpenLineage/compare/main...mattiabertorello:OpenLineage:feature/support_scala_2_13?expand=1
*Thread Reply:* > core
is compiled against a specific version of Spark, for the sake of development. The concern I have, is if that interface (or the objects it relies on) changes. So far, we haven't seen it yet - thankfully.
With every new Spark version we have to do some things anyway to make everything work, as we basically see by having all those directories with version-specific code 🙂
We'd see that in integration test code - and I think Spark now is vary to remove core stuff everyone is using.
@Mattia Bertorello nice work 🙂 Would it be able to put this behind some wrapper library, to be code-compatible with 2.12 code?
We might still need to release versions compiled for specific version, but sounds like those changes would not require source code changes if we can isolate those.
*Thread Reply:* > Would it be able to put this behind some wrapper library, to be code-compatible with 2.12 code?
Maybe use the already exist class ScalaConversionUtils
everywhere to make it easy to change. I will also check for the other changes if a wrapper class is necessary
*Thread Reply:* Yeah. would be perfect if we could have ScalaConversionUtils
that calls either ScalaConversionUtils_2_12
or ScalaConversionUtils_2_13
methods and stands as the only place in the whole codebase to be aware of which scala version is used.
*Thread Reply:* perhaps we could add empty Seq to ScalaConversionUtils
as well
@Maciej Obuchowski - how much hate would I get, if I switched from Groovy DSL to Kotlin DSL?
*Thread Reply:* you would only get love 😄
*Thread Reply:* I definitely like Kotlin one more as well, but Groovy was from the beginning and never felt the urge to make the switch myself 🙂
*Thread Reply:* The only reason why I'm feeling the urge to change it, is because I did all this exploratory work in Kotlin, and I am loathe to change it to Groovy.
*Thread Reply:* Go for it - one thing I'd ask is to make change in a separate PR
*Thread Reply:* The plan I have is 4-phased approach.
*Thread Reply:* might be worthy to do Kotlin first then?
*Thread Reply:* I mean, convert all the existing Groovy stuff and then just add on top of that
*Thread Reply:* But if you think it's easier to do it this way, then sure go for it
*Thread Reply:* Yeah - though I'm thinking in terms of not conflicting with Mattia too much, as he will have a ton of merge conflicts to resolve.
*Thread Reply:* Though, maybe it is best to get the pain out of the way sooner.
btw guys, I don't know if you read my letter to santa clause, but more collaboration on spark code was definitely on the list 😉 thank you for coming and I am super happy to see you working on this 🎅
An update @Maciej Obuchowski @Paweł Leszczyński - the effort to support Scala 2.13 isn't small. Holy hell.
*Thread Reply:* The challenging bit is the coupling of the spark32
, spark33
, spark34
, and spark35
modules to the shared
and spark3
modules.
*Thread Reply:* spark3
cannot be compiled with Scala 2.13, because, well, no spark dependencies were ever released for it with Scala 2.13
*Thread Reply:* would it help if we upgraded sparkVersion
in spark3 to 3.2 for Scala 2.13? 3.2 should be able to support 2.13
*Thread Reply:* Hi Pawel,
I upgraded spark3
project to 3.2 version and also the shared
project. Now they can support scala 2.12 and 2.13
If you want to have a look at the changes, there are a lot https://github.com/OpenLineage/OpenLineage/compare/main...mattiabertorello:feature/support_scala_2_13?expand=1
A summary of what I have done is:
• Use the ScalaConversionUtils
everywhere to hide the conversion that changes between Scala 2.12 and 2.13. And add some methods to avoid the use of inner classes like Seq$
and Map$
outside the class.
• Upgrade to Spark 3.2 the spark3
project by temporarily copying the spark32
. On this part I would like your suggestion on how to handle it.
• Upgrade shared
project spark version from 2.4.8
to 3.2.2, same of spark3
and spark32
• Upgrade net.snowflake:spark-snowflake
from 2.9.3
to 2.11.0
to support Scala 2.13
• Change from com.databricks:dbutils-api
to com.databricks:databricks-dbutils-scala
to support Scala 2.13
• Upgrade com.google.cloud.spark:spark-bigquery-with-dependencies
from 0.26.0
to 0.29.0
because the new version supports Scala 2.13
• In Gradle files support an easy change between Scala versions with a property file.
• In the Gradle app project and in every project, support a dynamic Scala version through a variable.
Now, I need some help understanding how to move on to make the contribution and know if this error exists before.
Scala module 2.12.3 requires Jackson Databind version >= 2.12.0 and < 2.13.0
Is it normal or not? When I run the tests, that also happens when I run gradle build
on main.
Then now, I need to understand what are the next steps:
• Which tests do I still need to do to say that Scala 2.13 is working? If I can run the CI, it would be helpful.
• How can I start opening PRs to avoid merging everything in a single PR? What about start opening a PR to use the ScalaConversionUtils
everywhere?
• Then, the difficult parts: continue working on the Damien side to make Gradle generate multiple jars, each one with each Scala version. In general I still need clarification on how to do it. I'm happy to hear some suggestions 🙂
Thanks!
*Thread Reply:* Hi @Mattia Bertorello,
Great news. I've clicked "draft PR" #2329 just to see if ./gradlew clean build
works fine for different Spark versions. We can close this PR later. Some of the integration tests require extra credentials (like access to big query), so they need to be triggered by us. Just to mention, so that you don't worry once they fail. CI starts automatically on new commit into the PR.
I really like your approach and I think we can already go with the separate PRs and steps: (1) use ScalaConversionUtils
(2) make scala version parameterized and stored in gradle.properties
although still equal to 2.12
Scala module 2.12.3 requires Jackson Databind version >= 2.12.0 and < 2.13.0
-> not sure of this. we do use jackson and databind in openlineage-java
and shadow them within the package. However, we're using 2.15.3
which is quite recent. That's the recent build on main in CI -> https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/8702/workflows/f1aed08c-8333-4460-a08b-1ad746cb046b can you find the same log there?
Can we keep building shared
conditionally and use spark 2.4.8
for scala 2.12
while 3.2
for scala 2.13
. that's an extra complexity which, on the other hand, assures the content of the submodule can be really shared among different spark versions. although a single jar is published at the end, this makes sure our code compiles with different versions. i think the same, as with shared
, can be done with spark3
.
The scope of this work is quite big, so I would be happy to work iteratively on the PRs and merge them even though still working on final solution.
*Thread Reply:* > Upgrade shared
project spark version from 2.4.8
to 3.2.2, same of spark3
and spark32
This effectively means moving minimum supported Spark version to 3.2.2, right?
*Thread Reply:* I still need to check if the 2.4.8
version still works since I didn't touch it. but I ran some test to avoid changing the version in the shared
project and I get this error.
```io.openlineage.spark.agent.lifecycle.LibraryTest
Test testRdd(Path, SparkSession) FAILED (2.3s)
java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.dependencies()Lscala/collection/Seq;
at io.openlineage.spark.agent.lifecycle.Rdds.flattenRDDs(Rdds.java:34)``
Only when I ran the test with the scala
2.13after update to
3.2+all the spark3 and 3x projects otherwise the same code run with no issues with scala
2.12` .
I still need to understand why is happing. I know that It's not great ditching the support for the 2.4 version
*Thread Reply:* Hi team!
Could you have a look to these PRs? https://github.com/OpenLineage/OpenLineage/pull/2357/ https://github.com/OpenLineage/OpenLineage/pull/2359 https://github.com/OpenLineage/OpenLineage/pull/2360
They are part of the effort to support Scala 2.13; for the moment, none of them changes any logic, just some code refactoring and a library update. We're still working on the Gradle side, which has some challenges.
Thanks!
@Vladimir Fedotov has joined the channel
@Teemu Mattelmäki has joined the channel
Hello! I tried to find if OL Spark column level lineage is dependent on Databricks implementation. According to this link it feels that the answer is is YES. https://openlineage.io/docs/integrations/spark/quickstart_databricks Could you confirm that dependency?
*Thread Reply:* What do you mean by dependent on Databricks implementation
?
Databricks provides their own, closed source versions of some LogicalPlan
for which visitors generally need to be implemented by reflection or by checking that we have the necessary classes, like here - but that does not mean integration for OSS Spark depends on some Databricks dependency
*Thread Reply:* Hello @Maciej Obuchowski! I have a customer that has on-prem. Hadoop environment with Spark & Hive. We want to establish column level lineage on that environment. We would need to install OpenLineage (OL) Spark Agent. Does OpenLineage support this type of environment or does it only support Spark on Databricks?
*Thread Reply:* > Does OpenLineage support this type of environment yes, you can try it in local notebook for example 🙂
*Thread Reply:* https://openlineage.io/docs/integrations/spark/quickstart_local
*Thread Reply:* Thanks! We'll try this option.
@Maciej Obuchowski @Paweł Leszczyński - Happy New Year. Just a note, we stumbled into a bit of roadblock with testing of the changes for Scala 2.13. Notably: Bitnami only publishes Spark images for Scala 2.12. This means the CI/CD pipeline will fail under Scala 2.13 conditions.
For example:
```SparkContainerIntegrationTest > testCreateTable() STANDARD_ERROR
24/01/10 09:54:01 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
java.lang.NoSuchMethodError: 'scala.collection.immutable.Seq org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.map(scala.Function1)'
at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.lambda$buildInputDatasets$6(OpenLineageRunEventBuilder.java:342)
at java.base/java.util.Optional.map(Optional.java:260)
at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildInputDatasets(OpenLineageRunEventBuilder.java:340)
at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.populateRun(OpenLineageRunEventBuilder.java:296)
at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildRun(OpenLineageRunEventBuilder.java:280)
at io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder.buildRun(OpenLineageRunEventBuilder.java:229)
at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.start(SparkSQLExecutionContext.java:87)
at io.openlineage.spark.agent.OpenLineageSparkListener.lambda$sparkSQLExecStart$0(OpenLineageSparkListener.java:91)
at java.base/java.util.Optional.ifPresent(Optional.java:178)
at io.openlineage.spark.agent.OpenLineageSparkListener.sparkSQLExecStart(OpenLineageSparkListener.java:91)
at io.openlineage.spark.agent.OpenLineageSparkListener.onOtherEvent(OpenLineageSparkListener.java:82)
24/01/10 09:54:01 INFO SparkContext: SparkContext is stopping with exitCode 0.
```
*Thread Reply:* This is because of a breaking change between Scala 2.12 and Scala 2.13 in the Scala Collections API.
*Thread Reply:* In Scala 2.12, Seq is an alias for scala.collection.Seq and in 2.13, Seq is an alias for scala.collection.immutable.Seq
*Thread Reply:* The question becomes, does OL (as an org), have a way to overcome this challenge? i.e., custom docker images?
*Thread Reply:* even official Spark images don't have 2.13 version 😞 https://hub.docker.com/_/spark
*Thread Reply:* as for Seq issue, maybe the solution would be to have something similar to the visitor factories we have: provide two factories, make integration choose at runtime which one to use, and initialize Seq there?
*Thread Reply:* It seems we need to build a Docker image for Spark Scala 2.13
*Thread Reply:* there are even some examples but they mostly focus on pure Spark, not python one https://github.com/xebia-functional/spark-scala3-example/blob/main/docker/Dockerfile
*Thread Reply:* I think people who run PySpark generally don't care about scala version 🤔
*Thread Reply:* The problem (for us) is that Scala 2.12's Jackson module has a security vulnerability in it, that was flagged. So our Data Platform team(s) had to mitigate. The mitigation strategy involved migrating to Scala 2.13.
*Thread Reply:* looking at https://github.com/bitnami/containers/blob/main/bitnami/spark/3.5/debian-11/Dockerfile there does not seem to be very simple way to just switch it to 2.13
*Thread Reply:* Nope. You have to download the Spark binaries compiled with 2.13, and build an entirely new docker image
*Thread Reply:* or at least, download the binaries and overwrite everything at runtime
*Thread Reply:* tbh I can't even find where they specify Scala 2.12
*Thread Reply:* They don't. It's the default version for Spark 3.x+
*Thread Reply:* You have to explicitly select Scala 2.13 version from Spark's website
*Thread Reply:* maybe doing it from scratch is only option? but I don't know if it's that easy; I feel if it was easy to build and maintain there would be existing image
*Thread Reply:* This may be silly, but... what if we just turned off docker tests for scala 2.13? There are a lot of non-docker integration tests and I don't feel we can fix the whole world.
*Thread Reply:* It will be very easy to break it later then, even if we test it manually at the start 🙂
*Thread Reply:* What about using the custom docker image suggested here only for scala 2.13 https://openlineage.slack.com/archives/C06A9EAUWUQ/p1704886984670229?thread_ts=1704881808.866719&cid=C06A9EAUWUQ
and built at run time, no super efficient but better than nothing 🙂
new ImageFromDockerfile()
.withFileFromPath("Dockerfile", Paths.get("./spark/docker/scala213/Dockerfile"))
.withBuildArg("SPARK_VERSION", System.getProperty("spark.version")))
*Thread Reply:* Yeah might be good for now. I think we should see how long it takes
*Thread Reply:* For the moment we run it in local to test the changes. I will open a PR when we have all the stuff running.
@Maciej Obuchowski - starting a new thread here:
Regarding the factories comment
as for Seq issue, maybe the solution would be to have something similar to the visitor factories we have: provide two factories, make integration choose at runtime which one to use, and initialize Seq there? I'm unsure about your intended use of factories and their placement in the call stack. To illustrate the issue at hand, I developed a simple project. Here's an overview:
app project:
```public class Application {
public static void main(String[] args) {
Demonstration demonstration = new Demonstration();
java.util.List
public class Demonstration {
public java.util.List
public java.util.List<String> convert(scala.collection.Seq<String> strings) {
return scala.collection.JavaConverters.seqAsJavaList(strings);
}
public java.util.List<String> convert(scala.collection.immutable.Seq<String> strings) {
return scala.collection.JavaConverters.seqAsJavaList(strings);
}
}
**thing_2.12 & thing_2.13 projects**
object Provider {
def getStrings(): Seq[String] = Seq("a", "b", "c")
}```
Explanation:
• Compilation Against Scala 2.12 (thing2.12): When compiling the 'app' project against Scala 2.12, the Java compiler recognizes that it should use the method java.util.List<String> convert(scala.collection.Seq<String> strings)
.
• Compilation Against Scala 2.13 (thing2.13): Conversely, when compiling against Scala 2.13, the compiler selects java.util.List<String> convert(scala.collection.immutable.Seq<String> strings)
for dispatch.
Runtime Behavior:
• Matching Versions: Running the 'app' code compiled against thing_2.12
with the thing_2.12
jar in the classpath functions correctly. The same is true when pairing app
compiled against thing_2.13
with the thing_2.13
jar.
• Mismatched Versions: Issues arise when there's a mismatch in versions. For instance, using the app
compiled against thing_2.12
but providing the thing_2.13
jar in the classpath leads to problems, and vice versa. This occurs because the JVM expects the method matching the compilation version, which it fails to find.
Expanding on the Mismatched Versions topic:
Compile-Time Binding in Java:
• n Java, method overloading resolution is a compile-time decision. The compiler determines which method to call based on the method signatures and the types of the arguments as they are known at compile time.
• In our case, the convert method is overloaded: one version takes a scala.collection.Seq<String>
, and the other takes a scala.collection.immutable.Seq<String>
.
• When you compile against Scala 2.12, scala.collection.Seq
is the prevalent sequence type. The Java compiler binds calls to convert to the method accepting scala.collection.Seq
.
• Conversely, compiling against Scala 2.13, where scala.collection.immutable.Seq
is the base type, leads the Java compiler to bind to the method accepting scala.collection.immutable.Seq
.
Runtime Behavior and Classpath:
• At runtime, the Java Virtual Machine (JVM) expects to find the same class types and method signatures that were present during compilation.
• If the runtime environment (classpaths and libraries) matches the compile-time environment, the application runs smoothly.
• However, if there's a mismatch – for instance, the application is compiled against Scala 2.12 but run with Scala 2.13 in the classpath – the JVM may not find the expected method signatures.
• This mismatch can lead to a NoSuchMethodError
or similar runtime exceptions. The JVM looks for convert(scala.collection.Seq<String>)
but only finds convert(scala.collection.immutable.Seq<String>)
, or vice versa.
*Thread Reply:* Thanks for the explanation!
*Thread Reply:* Yeah, I thought only initialization is an issue 🙂
*Thread Reply:* I can't think of better solution than reflection-based one right now
*Thread Reply:* @Damien Hawes Would something like this work?
We have interface:
public interface SeqConverter{
<T> scala.collection.Seq<T> convert(scala.collection.Seq<T> items);
<T> scala.collection.Seq<T> convert(scala.collection.immutable.Seq<T> items);
}
and implementations for 2.12 and 2.13 within separate modules compiled with respective Scala versions - just as we compile modules with respective Spark versions now, example:
```public class Seq212Converter implements SeqConcerter {
public
public
then naively call Seq methods:
public static Object callWithSeq(Object clazz, String method, scala.collection.Seq
String Scala213Converter =
"io.openlineage.spark.agent.scala213.Scala213Converter";
if ( scala.util.Properties$.MODULE$.versionNumberString().startsWith("2.12")) {
SeqConverter converter = Class.forName(Scala212Converter).newInstance();
return MethodUtils.invokeMethod(clazz, method, converter(seq));
} else {
SeqConverter converter = Class.forName(Scala213Converter).newInstance();
return MethodUtils.invokeMethod(clazz, method, converter(seq));
}
}``` The last part is definitely crude, as it allows you to just call a method with singular Seq argument, but it's just for sake of this discussion 🙂
*Thread Reply:* @Paweł Leszczyński when you get back take a look at this thread 🙂
*Thread Reply:* Not sure if what I wrote would work with something like this: https://github.com/OpenLineage/OpenLineage/blob/31f8ce588526e9c7c4bc7d849699cb7ce2[…]cle/plan/column/visitors/IcebergMergeIntoDependencyVisitor.java
*Thread Reply:* Seq<Seq<Seq<Expression>>> matched =
(Seq<Seq<Seq<Expression>>>) mergeRows.getMethod("matchedOutputs").invoke(node);
Seq<Seq<Expression>> notMatched =
(Seq<Seq<Expression>>) mergeRows.getMethod("notMatchedOutputs").invoke(node);
🤔
*Thread Reply:* It could work. However, something to keep in mind: we have to eliminate all references to scala.collection.Seq
in import statements and fully-qualified types. Basically it becomes a case of, "Immediately convert sequence to java.util.List".
*Thread Reply:* Otherwise we will face a combination of runtime and compile time errors.
*Thread Reply:* That code you linked, is one such place where we will need to change it - somehow.
*Thread Reply:* That, or we need to port that code to Scala.
*Thread Reply:* So that we don't need to constantly do a dance between Scala and Java collections.
*Thread Reply:* We might go with writing common module in Scala though. It would be really good if we could do that incrementally. Even better if we could have single module for both 2.12 and 2.13
*Thread Reply:* Would using this https://github.com/scala/scala-collection-compat help?
*Thread Reply:* AS far as I could tell, no. That adds methods to the APIs
@Maciej Obuchowski @Paweł Leszczyński - spark3
what is its purpose? Is it meant to be common code for all Apache Spark 3.x.y variants?
*Thread Reply:* I'm asking, because I noticed this situation, spark3
is compiled using Spark 3.1.x, but something like JdbcHandler
doesn't exist in Spark 3.0.x
*Thread Reply:* if the classes using JdbcHandler
aren't loaded, this shouldn't break the integration. However, we don't test our package with 3.0.x
@Maciej Obuchowski @Paweł Leszczyński - right, first prep PR for support Scala 2.13 is up. I'm breaking my changes up into smaller PRs to make then more consumable.
You'll find it here:
https://github.com/OpenLineage/OpenLineage/pull/2376
*Thread Reply:* Ah, right. I can't run the integration tests.
*Thread Reply:* My heart sank a bit, when i saw the integration tests fail
*Thread Reply:* I was like, "How?! I didn't touch the spark code!"
*Thread Reply:* We have a script to push external contributions to some origin branch, GH CI does apparently look at commit IDs instead of associating CI runs with branches so after this finishes: https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/8924/workflows/efdcc509-cc79-4158-a925-50a59733d446 it will be shown as CI results for your branch
*Thread Reply:* Anyway, we have different solution to this problem soon: https://github.com/OpenLineage/OpenLineage/pull/2374
*Thread Reply:* I see the tests went through
*Thread Reply:* It looks good to me and I approved PR. I would leave it unmerged for a moment to let Maciej review it if he likes to. We can merge it later this day.
BTW that's a really great improvement to the codebase @Damien Hawes. Thanks for preparing this.
*Thread Reply:* I've :gh_merged: it
@Arnab Bhattacharyya has joined the channel
@Maciej Obuchowski @Paweł Leszczyński - right the next PR is up for review: https://github.com/OpenLineage/OpenLineage/pull/2377
It builds off the previous one, so we see a lot more changes that what is strictly necessary. Once the previous one is merged, I'll rebase off of main.
*Thread Reply:* @Maciej Obuchowski - I see that the CI is failing because of environment variables not being set. Is there any way to retry the jobs to see if this is transient?
*Thread Reply:* ```ERROR: Missing environment variable {i}
Exited with code exit status 1```
*Thread Reply:* @Jakub Dardziński Will your PR 2374 fix this as well? https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/8936/workflows/d164feb2-219e-4f2a-8d8c-066d54e37efc/jobs/164588
*Thread Reply:* https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/8937/workflows/1b945af1-8353-4de7-8509-e0d1d714637a running here for now
*Thread Reply:* Seems like it went through
@Paweł Leszczyński - thanks for re-running the CI/CD
@Paweł Leszczyński @Maciej Obuchowski - this one is ready: https://github.com/OpenLineage/OpenLineage/pull/2384
*Thread Reply:* @Paweł Leszczyński - responded to your comments.
Oh, I see the integration tests are failing with that environment variable thing
Done- the above PR is ready for review
Any idea why the CI/CD pipelines keep failing with that "No environment variable" issue?
*Thread Reply:* hmmm... these come from the integration-tests
context. which are available for all members of OpenLineage org. so perhaps you need to be a member for your build to have access to that context?
@Jakub Dardziński might know more here
*Thread Reply:* I know we didn't merge your PR yet @Jakub Dardziński
*Thread Reply:* well, so for that one I have this PR: https://github.com/OpenLineage/OpenLineage/pull/2374
it is indeed an issue that should be resolved with that
*Thread Reply:* ok, great. let's test this out after this is merged then 🙂
*Thread Reply:* I was about to merge it today, forgot to do that after rebasing :face_palm:
@Jakub Dardziński has joined the channel
@Damien Hawes https://github.com/OpenLineage/OpenLineage/pull/2384#discussion_r1462874134 -> please let me know what do you think about that?
*Thread Reply:* The problem exists in the MatchesMapRecursively
class. It happens when k == "schema"
. The code expects the type of target.get(k)
to be a subtype of java.util.Map
, however it is actually a subtype of java.util.ArrayList
.
*Thread Reply:* public static Predicate<Map<String, Object>> predicate(
Map<String, Object> target, Set<String> omittedKeys) {
return (map) -> {
if (!map.keySet().containsAll(target.keySet())) {
log.error("Object keys {} does not match target keys {}", map.keySet(), target.keySet());
return false;
}
for (String k : target.keySet()) {
if (omittedKeys.contains(k)) {
continue;
}
Object val = map.get(k);
boolean eq;
if (val instanceof Map) {
eq =
MatchesMapRecursively.predicate((Map<String, Object>) target.get(k), omittedKeys)
.test((Map<String, Object>) val);
} else if (val instanceof List) {
eq =
MatchesMapRecursively.predicate((List<Object>) target.get(k), omittedKeys)
.test((List<Object>) val);
} else if (PRODUCER_WITH_UNDERSCORE.equals(k) || PRODUCER.equals(k)) {
eq = Versions.OPEN_LINEAGE_PRODUCER_URI.toString().equals(val);
} else if (val == null) {
eq = true;
} else {
eq = val.equals(target.get(k));
}
if (!eq) {
log.error(
"For key {} - passed object {} does not match target object {}",
k,
map.get(k),
target.get(k));
return false;
}
}
return true;
};
}
*Thread Reply:* it looks like, after changing bigquery connector version, some of the properties of BigQueryRelation changed their type. However, the test relies on serialized json https://github.com/OpenLineage/OpenLineage/blob/1.8.0/integration/spark/shared/src/test/resources/test_data/serde/bigqueryrelation-node.json
*Thread Reply:* So schema was always a list?
*Thread Reply:* and this json may contain somewhere a list which is expected to be a map within new library
*Thread Reply:* it failed on the key "schema"
*Thread Reply:* Though, I just fixed the test.
*Thread Reply:* Object val = map.get(k);
Object targetVal = target.get(k);
boolean eq;
if (val instanceof Map) {
if (targetVal instanceof Map) {
Predicate<Map<String, Object>> predicate = predicate((Map<String, Object>) targetVal, omittedKeys);
return predicate.test((Map<String, Object>) val);
} else if (targetVal instanceof List) {
Predicate<List<Object>> predicate = predicate((List<Object>) targetVal, omittedKeys);
return predicate.test((List<Object>) val);
} else {
eq = false;
}
}
*Thread Reply:* OK. Here's a mess-with-your-head moment.
*Thread Reply:* If you collapse it, into eq = predicate(...).test(...);
*Thread Reply:* I see I have changed the behaviour of the test
*Thread Reply:* in case you find it difficult or time consuming, you can remove this test i think. i don't consider this test in that form important
*Thread Reply:* It's just a bit tough, because I don't know what the bigquery stuff is supposed to output.
*Thread Reply:* So I don't know what the expected behaviour is.
*Thread Reply:* i would try removing schema from json file and if it does not help we can remove the test
*Thread Reply:* I tried a different trick
*Thread Reply:* The actual results have a lot more properties than the expected
*Thread Reply:* @Paweł Leszczyński - I think a better approach would be:
*Thread Reply:* > It's just a bit tough, because I don't know what the bigquery stuff is supposed to output. Last time I've bumped connector version I just serialized the bigquery node and partially replaced what was there previously... I think the test is bad conceptually because we depend on some internal structure of outside connector that we don't really care about
*Thread Reply:* I think we should remove it, and if we want a test to keep serialization stable we could try to serialize some fake node that we ourselves provide
*Thread Reply:* Yeah, that's what I found I was doing with my flattener
*Thread Reply:* but I think this is outside of the scope of this PR 🙂
*Thread Reply:* I was just adding an ever increasing amount of keys to omit
*Thread Reply:* and I just thought to myself, "If I am doing this, I'm doing something wrong"
*Thread Reply:* I'll remove the test then.
*Thread Reply:* Just to clarify, you're arguing that the whole test of the LogicalPlanSerializer is flawed?
*Thread Reply:* i.e., we're serialising spark internal structures that could change without warning
*Thread Reply:* I think tests like testSerializeSlicesExcessivePayload
are good - we provide our own implementation of LogicalPlan and we test the structure
*Thread Reply:* Even something like testSerializeInsertIntoHadoopPlan
is less problematic as the InsertIntoHadoopFsRelationCommand
is way more stable
*Thread Reply:* but BQ connector isn't...
*Thread Reply:* Changes pushed to all branches
*Thread Reply:* 🎉 'shared' was merged, thanks @Paweł Leszczyński
*Thread Reply:* :medal: awesome
*Thread Reply:* could u rebase other one on the list?
*Thread Reply:* 2385 has some conflict
*Thread Reply:* It's odd that there are conflicts
*Thread Reply:* Because I had telescoping branches
*Thread Reply:* I created spark3 from shared
*Thread Reply:* spark31 from spark3, etc
*Thread Reply:* what if you rebased spark35 '2390' ?
*Thread Reply:* perhaps it would be doable to merge it in one-go
*Thread Reply:* I'm seeing a failure on spark32
*Thread Reply:* Something to do with JAckson
*Thread Reply:* java.lang.ExceptionInInitializerError
at io.openlineage.spark.agent.lifecycle.LibraryTest.testRdd(LibraryTest.java:112)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.13.2 requires Jackson Databind version >= 2.13.0 and < 2.14.0 - Found jackson-databind version 2.15.3
at io.openlineage.spark.agent.lifecycle.LibraryTest.testRdd(LibraryTest.java:112)
*Thread Reply:* ok, i approved 2385 and triggered integration tests on that
*Thread Reply:* jackson things are always a problem
*Thread Reply:* we do have jackson packaged and shaded within openlineage-java
*Thread Reply:* Yeah - I'm trying to bisect where exactly things went wrong
*Thread Reply:* i.e., what changes did I make in spark32, that cause these jackson errors
*Thread Reply:* And I see 'spark31' is failing during integrationTests when run under spark 3.3.3
*Thread Reply:* sometimes rerun from failed simply helps
*Thread Reply:* integration tests depend on external resources like s3 buckets / big query tables and if you run multiple instances they can fail
*Thread Reply:* i can rerun it but anyway we will have to rebase each PR one by one and run tests again
*Thread Reply:* Ah, I see the approval step now.
*Thread Reply:* > we do have jackson packaged and shaded within openlineage-java Hmm, when I dump the dependencies, I see raw Jackson dependencies. Not shaded ones.
*Thread Reply:* (The 2.13.2 migration is because I force Jackson to 2.13.2)
*Thread Reply:* sry, we relocate jackson within openlineage-spark https://github.com/OpenLineage/OpenLineage/blob/main/integration/spark/build.gradle#L153
*Thread Reply:* To expand on it more, there are two jacksons - one is version provided/required by particular Spark version, and other one used by our integration to serialize OpenLineage events. The second one needs to be shaded
*Thread Reply:* Hmm - it might be worth creating a module that has jackson shaded already
*Thread Reply:* because of the openlineage-java thing
*Thread Reply:* https://github.com/OpenLineage/OpenLineage/pull/2386 -> @Damien Hawes could u rebase it?
*Thread Reply:* Done
*Thread Reply:* i think there was some jackson related hack that is necessary for serializing logical plan bcz we need to serialize logical plan's with jackson of the spark
*Thread Reply:* Yeah, the LogicalPlanSerializer
*Thread Reply:* does some reflection things
*Thread Reply:* if we want to serialize plan node in a given scala version, we need to run it with jackson of the same scala version
*Thread Reply:* but jackson attached to spark is missing jackson-databind (if i remember it right)
@Jakub Dardziński - ~how does the CI/CD pipeline now work? I see that the "run steps" aren't present when I push. Is this intended?~ Nevermind, it seems it just took a while before Circle CI spun them up and GitHub showed them as running.
*Thread Reply:* the change is that now integration tests from forked repos require approval
I don't understand. I ran the failing integration test of spark31
locally, it failed. I ran it again, it passed.
*Thread Reply:* it's possible that spark has already cleared the QueryExecution
from the executionIdToQueryExecution
in the SQLExecution
class if the job runs very fast
*Thread Reply:* (if the reason for failure was that START event was not send)
*Thread Reply:* Any idea how I debug the integration tests via intellij?
*Thread Reply:* Each time I try and run the ColumnLineageIntegrationTest
it only runs the unit tests.
*Thread Reply:* I've told intellij to use the integrationTest task, but it's like, "nope"
*Thread Reply:* and just runs the unit tests any way
*Thread Reply:* Can you check if it's integrationTest in run configurations?
*Thread Reply:* This is what its running:
:app:integrationTest --tests "io.openlineage.spark.agent.ColumnLineageIntegrationTest" -Pspark.version=3.3.1
Send help.
OK - I found one:
all > io.openlineage.spark.agent > ColumnLineageIntegrationTest
columnLevelLineageSingleDestinationTest(
*Thread Reply:* If I run ./gradlew app:test -Pspark.version=3.3.1
*Thread Reply:* I get this failure:
Execution failed for task ':app:compileJava'.
> Could not resolve all files for configuration ':app:compileClasspath'.
> Could not resolve com.fasterxml.jackson:jackson_bom:2.13.4.1.
Required by:
project :app > org.apache.spark:spark_core_2.12:3.3.1 > com.fasterxml.jackson.core:jackson_databind:2.13.4.1
> Could not resolve com.fasterxml.jackson:jackson_bom:2.13.4.1.
> Could not parse POM <https://astronomer.jfrog.io/artifactory/maven-public-libs-snapshot/com/fasterxml/jackson/jackson-bom/2.13.4.1/jackson-bom-2.13.4.1.pom>
> Already seen doctype.
> Could not resolve com.fasterxml.jackson:jackson_bom:2.13.4.1.
Required by:
project :app > org.apache.spark:spark_core_2.12:3.3.1 > org.apache.avro:avro:1.11.0 > com.fasterxml.jackson.core:jackson_core:2.13.4
project :app > org.apache.spark:spark_core_2.12:3.3.1 > org.apache.spark:spark_kvstore_2.12:3.3.1 > com.fasterxml.jackson.core:jackson_annotations:2.13.4
> Could not resolve com.fasterxml.jackson:jackson_bom:2.13.4.1.
> Could not parse POM <https://astronomer.jfrog.io/artifactory/maven-public-libs-snapshot/com/fasterxml/jackson/jackson-bom/2.13.4.1/jackson-bom-2.13.4.1.pom>
> Already seen doctype.
*Thread Reply:* spark-core
requires 2.13.4.1
*Thread Reply:* Suggestion: bump the spark versions in the CI/CD configuration to the latest patch versions
*Thread Reply:* https://issues.apache.org/jira/browse/SPARK-40886 https://spark.apache.org/releases/spark-release-3-3-2.html
something was changed in 3.3.2
*Thread Reply:* why do we have 3.3.1 ?
*Thread Reply:* I had a look at the circle ci
*Thread Reply:* but I could have sworn I saw 3.3.1 in one of the tests
*Thread Reply:* 3.3.1
is in spark33
*Thread Reply:* and the module has some tests
*Thread Reply:* I'm going to push to spark35
*Thread Reply:* with changes to circle ci
*Thread Reply:* and also updating the gradle.properties for all branches that came before it
*Thread Reply:* along with removing the jackson override in app/build.gradle
*Thread Reply:* and if that passes, we can merge spark35 in, and take everything else with. Does that sound OK, @Paweł Leszczyński?
*Thread Reply:* Seems bitnami doesn't produce a 2.4.8 image
*Thread Reply:* @Paweł Leszczyński - could you approve the CI/CD pipeline for the spark integration test tasks?
*Thread Reply:* https://github.com/OpenLineage/OpenLineage/pull/2390
*Thread Reply:* > GoogleCloudIntegrationTest
*Thread Reply:* it's @Maciej Obuchowski stuff :face_palm:
*Thread Reply:* let me work first on upgrading spark branches to latest patch versions
*Thread Reply:* will come back to you
@Damien Hawes are the PRs for spark31-34
valid anymore? can you rebase spark2
?
*Thread Reply:* No - those PRs can be closed, because spark35 built on top of 34, which built on top of 33, etc.
*Thread Reply:* OK. PRs are closed, with a link to 2390
*Thread Reply:* @Paweł Leszczyński - spark2
has been rebased and pushed.
*Thread Reply:* > Task :app:compileTestJava FAILED
*Thread Reply:* i expected it to fail but it's kind of early
*Thread Reply:* > Task :spark2:compileTestJava FAILED
OptimizedCreateHiveTableAsSelectCommandVisitorTest.java:36: error: cannot find symbol
import org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand;
^
symbol: class OptimizedCreateHiveTableAsSelectCommand
location: package org.apache.spark.sql.hive.execution
1 error
*Thread Reply:* perhaps it's in 2.4.8
*Thread Reply:* was this changed?
*Thread Reply:* I reverted back to 2.4.6
*Thread Reply:* To match the spark image
*Thread Reply:* It was always 2.4.6
*Thread Reply:* wasn't it compiled with 2.4.8?
*Thread Reply:* sparkVersion = '2.4.8'
it was
*Thread Reply:* But integration test sits at 2.4.6
*Thread Reply:* we compiled it with 2.4.8 but run integration tests based on 2.4.6
*Thread Reply:* Things going through with 2.4.8
*Thread Reply:* Apache Spark introducing new features in patch versions
*Thread Reply:* @Paweł Leszczyński - pushed the changes, along with changes to the CI process
*Thread Reply:* Gradle will now use plain logs
*Thread Reply:* Which means it should be easier to see things
*Thread Reply:* i.e., I found it difficult to figure out which test is failing
*Thread Reply:* Unless it was near the bottom
*Thread Reply:* OK - I need to step out for a bit
*Thread Reply:* I'll handle this issue when I am back
*Thread Reply:* app:test fails under spark 2.4.6
*Thread Reply:* OptimizedCreateHiveTableAsSelectCommand
was added in 2.4.7 or 2.4.8 AFAIK
*Thread Reply:* > we compiled it with 2.4.8 but run integration tests based on 2.4.6 The reason was lack of Docker image for 2.4.8, right?
*Thread Reply:* OK - I've managed to fix the issue with app:compileTestJava
*Thread Reply:* and the subsequent failures that came afterwards.
*Thread Reply:* @Paweł Leszczyński - if you're able to, can you approve the integration tests for: https://github.com/OpenLineage/OpenLineage/pull/2391
*Thread Reply:* there is still one test for 2.4.8 spark_kafka
that is not passing
Caused by: java.lang.NoSuchMethodError: org.apache.spark.internal.Logging.$init$(Lorg/apache/spark/internal/Logging;)V
*Thread Reply:* with @Mattia Bertorello’s help, we discovered the source of the problem
*Thread Reply:* It's a Scala binary problem.
*Thread Reply:* Bitnami's image is Scala 2.11 for 2.4.6
*Thread Reply:* We are pushing org.apache.spark:spark_sql_kafka_0_10_2.12:2.4.6
to the image
*Thread Reply:* can we use Scala 2.11 for 2.4? I think most people use 2.11 for it
*Thread Reply:* and I would really try to not push anything new there 🙂 if you use 2.4 your priority should be to upgrade
*Thread Reply:* Yeah - I reverted back to 2.11
*Thread Reply:* OK. Pushed the changes to the branch
*Thread Reply:* OK - https://github.com/OpenLineage/OpenLineage/pull/2391
Is waiting at the integration test phase
*Thread Reply:* Btw the airflow integration test is waiting for approval.
@Damien Hawes is this last required PR? We're thinking about releasing soon to get this feature out fast
*Thread Reply:* There are a few more things to do. The 'app' module for instance needs to be migrated and that's a tough one, because it has integration tests that need to be 'ported'. For instance, Scala 2.13 variants of the integration tests.
*Thread Reply:* I don't feel comfortable saying, "Hey, let's release 2.12 and 2.13" without those checks and balances being in place.
@Paweł Leszczyński @Maciej Obuchowski - is it possible for OpenLineage to use the GitHub container registry, to host custom docker images, i.e., the Scala 2.13 variants of Apache Spark.
*Thread Reply:* I think the limits are very low for free accounts? https://docs.github.com/en/get-started/learning-about-github/githubs-plans#github-free-for-organizations
*Thread Reply:* I've used quay.io for free hosting of docker images
*Thread Reply:* > • 500 MB GitHub Packages storage Wow.
1 spark image will blow that storage limit out of the water.
*Thread Reply:* Trust me, I know. I've built several Spark images before. They come in at like 1 GB - 1.5 GB per image.
*Thread Reply:* they give 50gb in github enterprise. this would not make any sense...
*Thread Reply:* https://github.com/features/packages#pricing
*Thread Reply:* Aye - but what's the limit on the size of the packages? 🙂
*Thread Reply:* we might be able to use Google's container registry too https://cloud.google.com/artifact-registry
*Thread Reply:* although I'm not sure our account has the permissions
@Paweł Leszczyński @Maciej Obuchowski - what's the possibility of temporarily dropping support for Spark 2.4.x? Reasoning:
We run integration tests against 2.4.x, 3.2.x, 3.3.x, 3.4.x, and 3.5.x. Of them 2.4 is the only one that doesn't support Scala 2.13.
It really complicates the build process of 'app'.
Furthermore, 'shared' is compiled using Spark 3.2.4, and it is the first version to support Scala 2.13.
*Thread Reply:* I don't think it's possible for quite a bit of time still. There are people on, like EMR 5.X versions which uses 2.4.8 and is still supported, that are using OL
*Thread Reply:* I wish we could tho 🙂
Hi team, I opened a PR about start moving gradually the code of so called vendor
like Snowflake, BigQuery, Iceberg, Delta, Databricks in to a separate project and using the service loader to load the objects base on the runtime availability of the vendor itself.
I started with the easy one Snowflake and I already have a branch for Iceberg that is the one brings some issues during the Scala 2.13 support work.
So now I find out a similar effort to separate this code is in progress in another PR #2272 @Maciej Obuchowski @Paweł Leszczyński Do you know an ETA for this PR #2272 been release and if has exactly the same scope of mine? In general my idea was not changing too much the code to reduce too big PR and moving incrementally.
*Thread Reply:* Also I think there is some conflict with the @Damien Hawes works there.
*Thread Reply:* I haven't fully reviewed your work, and I think your solution is really nice, but I think there are some significant differences between your approach @Mattia Bertorello and @Paweł Leszczyński’s one.
First, is that Paweł's interface is segregated from rest of OpenLineage-Spark integration, and OpenLineage-Spark depends on it. Second, is that we're trying to move the responsibility of maintaining lineage from the external DataSource implementations to those implementations - and we're getting good responses from for example BigQuery connector maintainer: https://github.com/OpenLineage/OpenLineage/issues/2349
I guess on the other way, Paweł's approach does not really solve your issue which is Scala compatibility and ease of compilation.
So I think we want to have both Paweł's approach as a public API, and your solution as a stop-gap to support the Scala 2.13 support as well? I guess to make it more precise - I would prefer to have Paweł's solution as a public API, while your as internal, not advertised one - I would prefer vendors
to implement support in their own code, rather than provide us with implementation that would make us responsible for maintaining it.
What do you think?
*Thread Reply:* > I would prefer vendors
to implement support in their own code, rather than provide us with implementation that would make us responsible for maintaining it.
It makes sense to move the responsibility to them and the work in Paweł's PR; It's great to reach this goal.
Mine was more for the short term, and yes, it can be considered a first step towards that goal. In this way, we will already have all the code from the vendors separated into projects, and the only work is to adapt to the new interface without changing the core.
btw could you trigger the integration tests? Thanks! https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9118/workflows/cfb67fa1-db0e-4043-a73e-14de40968534
*Thread Reply:* Added a few comments to PR 🙂
*Thread Reply:* Let me work on them in these days 🙂
@Paweł Leszczyński @Maciej Obuchowski - just curious, how certain are we that Spark 2 classes (CreateTableLikeCommand
) are never created in Spark 3.2.x - Spark 3.5.x applications?
*Thread Reply:* there's class io.openlineage.spark3.agent.lifecycle.plan.CreateTableLikeCommandVisitor
in spark3
*Thread Reply:* Hmmm - so if the listener gets a CreateTableLikeCommand
how does it know which CreateTableLikeCommandVisitor
to delegate to?
@Paweł Leszczyński - with the PySpark integration tests, does the presence of "RUNNING" events cause the assertions to fail or not?
I'm encountering a failure in the Kafka read write test, and I see additional running events, which makes me wonder what is being asserted for, except for the certain properties.
*Thread Reply:* in most cases, we don't emit nor assert running events
*Thread Reply:* it makes more sense to check running
for streaming jobs
*Thread Reply:* what's the test you have problem with?
*Thread Reply:* SparkContainerIntegrationTest#testPysparkKafkaReadWrite
*Thread Reply:* I removed the facets, to make the JSONs smaller
*Thread Reply:* and I see that the run events are missing their input and output datasets.
*Thread Reply:* Finally. I see the exception
*Thread Reply:* @Paweł Leszczyński have a look:
https://gist.github.com/d-m-h/95a8005063478c4844abd78f4eaf4e47
*Thread Reply:* why doesn't this go into
if (KafkaRelationVisitor.isKafkaSource(command.dataSource()))
condtition?
*Thread Reply:* I'm not sure. I tested with the original bitnami spark 3.2.4 scala 2.12
*Thread Reply:* and I get the same error
*Thread Reply:* I see that KafkaRelationVisitor is in the uber jar
*Thread Reply:* jar tvf openlineage-spark-app_2.13-1.9.0-SNAPSHOT.jar | grep Kafka
11382 Mon Feb 05 11:44:56 CET 2024 io/openlineage/spark/agent/lifecycle/plan/KafkaRelationVisitor.class
1739 Mon Jan 22 16:48:56 CET 2024 io/openlineage/client/transports/KafkaConfig.class
1626 Mon Jan 22 16:48:56 CET 2024 io/openlineage/client/transports/KafkaTransportBuilder.class
3594 Mon Jan 22 16:48:56 CET 2024 io/openlineage/client/transports/KafkaTransport.class
*Thread Reply:* Found the reason:
public static boolean hasKafkaClasses() {
log.debug("Checking for Kafka classes");
try {
KafkaRelationVisitor.class
.getClassLoader()
.loadClass("org.apache.spark.sql.kafka010.KafkaSourceProvider");
log.debug("Kafka classes found");
return true;
} catch (Exception e) {
log.debug("Kafka classes not found");
return false;
}
}
2024-02-05 12:04:11 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking for Kafka classes
2024-02-05 12:04:11 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Kafka classes not found
*Thread Reply:* It seems when the Kafka jars are loaded via --packages
- we can't find the class.
*Thread Reply:* OK. I changed the check to this:
Thread.currentThread()
.getContextClassLoader()
.loadClass("org.apache.spark.sql.kafka010.KafkaSourceProvider");
*Thread Reply:* I see this:
2024-02-05 12:14:19 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking for Kafka classes
2024-02-05 12:14:19 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Kafka classes found
2024-02-05 12:14:19 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking for Kafka classes
2024-02-05 12:14:19 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Kafka classes found
*Thread Reply:* However:
java.lang.NoClassDefFoundError: org/apache/spark/sql/kafka010/KafkaSourceProvider
at io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor.isKafkaSource(KafkaRelationVisitor.java:72)
java.lang.NoClassDefFoundError: org/apache/spark/sql/kafka010/KafkaSourceProvider
at io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor.isKafkaSource(KafkaRelationVisitor.java:72)
at io.openlineage.spark.agent.lifecycle.plan.SaveIntoDataSourceCommandVisitor.apply(SaveIntoDataSourceCommandVisitor.java:97)
at io.openlineage.spark.agent.lifecycle.plan.SaveIntoDataSourceCommandVisitor.apply(SaveIntoDataSourceCommandVisitor.java:46)
*Thread Reply:* recalling folks from microsoft had similar problem long time ago here -> https://github.com/OpenLineage/OpenLineage/blob/bba64fbddbea3557fc2342a3d2fbd4194b[…]penlineage/spark/agent/lifecycle/plan/KustoRelationVisitor.java
*Thread Reply:* I was about to do just that
*Thread Reply:* Just for curiosity's sake:
public static boolean isKafkaSource(CreatableRelationProvider provider) {
log.debug("Checking if provider is KafkaSourceProvider");
if (!hasKafkaClasses()) {
return false;
}
log.debug("Checking if the provider is a KafkaSourceProvider. {}", provider.getClass().getCanonicalName());
boolean check = provider instanceof KafkaSourceProvider;
log.debug("Is the provider a KafkaSourceProvider? {}", check);
return check;
}
*Thread Reply:* 2024-02-05 12:19:33 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking if provider is KafkaSourceProvider
2024-02-05 12:19:33 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking for Kafka classes
2024-02-05 12:19:33 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Kafka classes found
2024-02-05 12:19:33 DEBUG io.openlineage.spark.agent.lifecycle.plan.KafkaRelationVisitor: Checking if the provider is a KafkaSourceProvider. org.apache.spark.sql.kafka010.KafkaSourceProvider
2024-02-05 12:19:33 INFO io.openlineage.spark.agent.util.PlanUtils: apply method failed with
java.lang.NoClassDefFoundError: org/apache/spark/sql/kafka010/KafkaSourceProvider
*Thread Reply:* The class says that it is that instance
*Thread Reply:* but the instance check says "no, I'm not dealing with you"
*Thread Reply:* Different classloaders?
*Thread Reply:* The main class loader loads the openlineage-spark jar
*Thread Reply:* and another class loader loads the kafka jars
*Thread Reply:* Thread.getCurrentThread().getContextClassLoader().loadClass(<name>) does the trick
*Thread Reply:* maybe we can generalize the solution?
ClassloaderUtils.loadClass
that tries to load it from both classloaders?
*Thread Reply:* The idea is to not make next person debug the same issue with another connector 🙂
*Thread Reply:* Well - that was annoying.
*Thread Reply:* But, it seems like the kafka tests pass now
@Mattia Bertorello it's failing on spotless 🙂
*Thread Reply:* Yeah sorry, I apply some IntelliJ suggestions 😅
*Thread Reply:* Regarding the ServiceLoader problem, I implemented a static version with a classic class loader. My main concern is that we cannot ensure loose coupling between the vendor projects and the app/spark3 projects if we put the class name inside those. In this case, we need to be aware of the implementations.
Here the code ```public interface Vendors { List<String> VENDORS = Arrays.asList("io.openlineage.spark.agent.vendor.snowflake.SnowflakeVendor");
static Vendors getVendors() { ClassLoader cl = Thread.currentThread().getContextClassLoader();
List<Vendor> vendors =
VENDORS.stream()
.map(
vendorClassName -> {
try {
Class<?> vendor = cl.loadClass(vendorClassName);
return (Vendor) vendor.newInstance();
} catch (ClassNotFoundException
| InstantiationException
| IllegalAccessException e) {
return null;
}
})
.filter(Objects::nonNull)
.filter(Vendor::isVendorAvailable)
.collect(Collectors.toList());```
*Thread Reply:* @Paweł Leszczyński Do you think this version is better?
I can merge it so we can unblock the PR and maybe merge it.
@Maciej Obuchowski do you think your comments are resolved
, or is there something else to do?
*Thread Reply:* I think mine are resolved 🙂
*Thread Reply:* I lost current status of ServiceLoader
discussion. I found thumbs-up in github under my comment and then some doubts about using class.forName
. I got confused which is more recent and which direction is preferred to go? I am open for discussion. @Mattia Bertorello @Maciej Obuchowski
*Thread Reply:* No problem, having the ServiceLoader brings more loose coupling between the app project and others. But if you already found users that struggle with creating a uber-jar that keep/merge the META-inf files could be better to scarify the loose coupling.
*Thread Reply:* https://github.com/OpenLineage/OpenLineage/issues/1860 -> this is an example of issue we have in our backlog. there are at least 3 different people involved so I assumed it's not that uncommon
*Thread Reply:* ok, makes sense. So I can commit the change above that doesn't use the service loader and at the same time doesn't need the compile code be aware of the implementation since it happen at runtime and only through the interface. I can add this issues in the comments to justify the solution.
*Thread Reply:* BTW I'm working on having JobTypeJobFacet
in Spark and then in DBT integration. Have you worked on this before?
*Thread Reply:* I worked on that for Flink
*Thread Reply:* Yeah I'm checking that work to replicate on the spark integration 🙂
*Thread Reply:* it's not that super easy to determine if a job is batch or streaming
*Thread Reply:* Do we need also in spark?
*Thread Reply:* it's possible to have streaming job in spark
*Thread Reply:* and processingType
property is required within the facet
*Thread Reply:* Don't streaming jobs in Spark have different logical plans?
*Thread Reply:* It seems there is a isStreaming()
method in the LogicalPlan
https://github.com/OpenLineage/OpenLineage/pull/2410
Let me know what do you think 🙂
*Thread Reply:* is the extra param in buildRun
necessary?
*Thread Reply:* why wouldn't this match in JobBuilder?
*Thread Reply:* Not really I can use use the jobBuilder
and then extract the facets from there.
I started with that implementation but then it wasn't so clean so I changed my mind
*Thread Reply:* But no problem to implement in that way. Because I need to build the jobBuilder to extract the facets that is not super great 😅 because then the code use the jobBuilder to overwrite the facets
*Thread Reply:* yeah, looks like the problem of nested builders
*Thread Reply:* thinking loadly - could we pass jobFacetsBuilder as a param to avoid adding each facet as param in future?
*Thread Reply:* Done
*Thread Reply:* could you add a line into changelog?
*Thread Reply:* Sure, but tomorrow, I'm heading home 🙂
*Thread Reply:* sure, giving an approval with a comment to fill changelog
@Mattia Bertorello PR 2405 is failin on
VendorsTest > testGetVendors() FAILED
org.opentest4j.AssertionFailedError at VendorsTest.java:60
*Thread Reply:* Sorry, I forgot to fix the tests
*Thread Reply:* :gh_merged:
@Paweł Leszczyński @Maciej Obuchowski - when you run
'./gradlwe :app:test' --tests "io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContextTest" -Pspark.version=3.2.4
Do these tests have a InvocationTargetException
which is caused by a JsonMappingException
which is caused by a StackOverflowError
?
*Thread Reply:* I have this reference chain
*Thread Reply:* I got BUILD SUCCESSFUL in 55s
on latest main
*Thread Reply:* build successful on my side, did you try rebuilding and republishing to local openlineage-java
?
@Damien Hawes @Mattia Bertorello How far you from the ultimate goal? Will you need to rewrite all the extensions to vendors or snowflake only? Are you going to release multiple artifacts at the end per scala 2.12 and 2.13. Feeling kind of lost where are we at the moment.
*Thread Reply:* The idea is that there will be two artefacts.
*Thread Reply:* I'm working on the integration tests at the moment.
*Thread Reply:* I will continue working on the vendor extraction, now the next one is Iceberg. And not related to that I need to add the job type to Airflow and DBT.
*Thread Reply:* what's the problem with iceberg? i thought is published for both: scala 2.12 and 2.13
*Thread Reply:* It will make it easier to support multiple versions of the same vendor, in this case, Iceberg, if there are breaking changes, including supporting the old version more easily without too much reflection, like if the code is mixed with the core spark code.
@Paweł Leszczyński - https://github.com/OpenLineage/OpenLineage/pull/2413
*Thread Reply:* that's... not a small pr 🙂
*Thread Reply:* Thankfully, it's just "add new stuff"
*Thread Reply:* It could be split into 3
*Thread Reply:* 1. Modification of the build.gradle files and the common config plugin
*Thread Reply:* OK - splitting it up.
1st one:
Docker build plugin: https://github.com/OpenLineage/OpenLineage/pull/2414
(365 lines)
*Thread Reply:* 2nd one: https://github.com/OpenLineage/OpenLineage/pull/2415
@Maciej Obuchowski @Paweł Leszczyński
74 additions, 62 removals
@Mattia Bertorello @Damien Hawes we are thinking about the OL release in next few days. Just wondering if we should wait for your PRs, or shall we release without waiting for this. Would love to hear your opinion on that. Keep in mind, a voting for an extra release can be initiated anytime you like in case you didn't want to wait the whole month.
*Thread Reply:* Thanks for letting us know. I would like to implement the JobType in Airflow; can you wait until Monday at the end of the day?
*Thread Reply:* @Maciej Obuchowski I did the PR in the Airflow repository. https://github.com/apache/airflow/pull/37255 and fix the other one https://github.com/OpenLineage/OpenLineage/pull/2412/
*Thread Reply:* Thx for info
*Thread Reply:* Do you think we can merge it #2412 before the release? Is there some missing? @Maciej Obuchowski
@Damien Hawes I merged 2413 and approved https://github.com/OpenLineage/OpenLineage/pull/2415 could u rebase it and resolve conflict?
*Thread Reply:* I can't find SparkIcebergIntegrationTest
being run in integration-test-integration-spark-scala-2_12-3.5.0 -> https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9275/workflows/78f45083-84f5-4379-b475-96ccececbbe5/jobs/173596
*Thread Reply:* 3.5.0 never had iceberg though
*Thread Reply:* integration tests run in 3mins30secs which is like way faster than it used to be
*Thread Reply:* i cannot find it for 3.4.2 as well
*Thread Reply:* but it's the same on recent main I think. This is main build from 6 days ago https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9160/workflows/33a4d308-d0e6-4d75-a06b-7d8ef89bb1fe and SparkIcebergIntegrationTest
is present there
*Thread Reply:* and execution time are above 10 mins
*Thread Reply:* I'll check, might be the logic for determining it is faulty
*Thread Reply:* @Paweł Leszczyński - any ideas why the Delta and Iceberg tests will execute and pass on my local device, but in the CI they fail?
*Thread Reply:* https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9292/workflows/30ba0bdc-a831-4431-b023-d35379137a61/jobs/174557/tests
I cannot replicate these failures locally
*Thread Reply:* in one of the previous PRs some Docker timeout got shortened I think.
*Thread Reply:* My guess is that local environment is a way faster
*Thread Reply:* Yeah - it's hard without seeing the STDOUT logs
*Thread Reply:* Is there a way to see them, besides scrolling through the crazy logs?
*Thread Reply:* like when you get the test report locally, I can see STDOUT and STDERR
*Thread Reply:* Otherwise, I have to go log diving 😂
*Thread Reply:* i think you can also ssh into circleCI and run single test
*Thread Reply:* i also search the logs for () FAILED
which brings closer to real error but it's kind of log diving
*Thread Reply:* Then there is this one:
testExternalRDDWithS3Bucket(SparkSession)
ava.lang.UnsupportedClassVersionError: com/sun/istack/Pool has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at com.sun.xml.bind.v2.runtime.JAXBContextImpl$JAXBContextBuilder.build(JAXBContextImpl.java:1126)
at com.sun.xml.bind.v2.ContextFactory.createContext(ContextFactory.java:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:247)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:234)
at javax.xml.bind.ContextFinder.find(ContextFinder.java:441)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584)
at com.amazonaws.util.Base64.<clinit>(Base64.java:52)
at com.amazonaws.util.BinaryUtils.toBase64(BinaryUtils.java:59)
at com.amazonaws.services.s3.AmazonS3Client.populateRequestHeaderWithMd5(AmazonS3Client.java:4547)
at com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:2325)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$deleteObjects$16(S3AFileSystem.java:2785)
It blows my mind that the Amazon stuff uses Java 8, but they declare a dependency that is compiled with Java 11.
*Thread Reply:* Yeah - I don't know. I increased the timeout to 60 seconds and the CI still complains
*Thread Reply:* Any ideas @Paweł Leszczyński?
*Thread Reply:* The common theme amongst the failing tests, is that they all spin up local spark sessions.
*Thread Reply:* there is no events emitted nor error message, which is totally weird and can be mockserver related. I saw you changed mockserver ports to be above 10000. perhaps it's not allowed to use such high ports in circle ci. if a default http transport timeout is higher than timeout for mockserver assertion, this can look like that -> no error and assertion failed
*Thread Reply:* Yeah, there were two tests that had mock server ports set to 1081
*Thread Reply:* and they were running into each other
*Thread Reply:* so I was like, "Let's make them random"
*Thread Reply:* could u set it for delta to static value as before and see if they fail again?
*Thread Reply:* Switching back to hard coded ports
*Thread Reply:* Made the 3.5.0 pipeline pass everything except the io.openlineage.spark.agent.SparkGenericIntegrationTest
and the test that tries to use S3
*Thread Reply:* The former failed, once again, with the assertion error
*Thread Reply:* the latter, about the java version error
*Thread Reply:* @Maciej Obuchowski SparkGenericIntegrationTest is your recent test. You call getEventsEmitted
and wait only as long as list is non-empty. So if you expect two events, you don't wait for the second one. Am I right?
*Thread Reply:* TIL: If you relocate org.slf4j
(which app
was previously doing) - you will not see the logs of the shaded jar.
*Thread Reply:* yes, that's why I always comment that and @Paweł Leszczyński always asks why I've done it during review if I forget to revert it 🙂
*Thread Reply:* @Paweł Leszczyński I do verifyEvents
before getEventsEmitted
, that should take care of getting all the events I care about
*Thread Reply:* Hmm - did I find an undocumented "feature"?
Basically, if the OpenLineage Java client detects that you're trying to post to /api/v1/namespaces/<namespace>
it will inject the namespace into the run event under $.job.namespace
if $.job.namespace
is equal to default
?
*Thread Reply:* I'm seeing this in the logs for the integration test:
io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused)
at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:128) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?]
at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationEvent(OpenLineageSparkListener.java:321) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?]
at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationEndEvent(OpenLineageSparkListener.java:329) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?]
at io.openlineage.spark.agent.OpenLineageSparkListener.onApplicationEnd(OpenLineageSparkListener.java:229) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?]
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:57) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) ~[iceberg-spark-runtime-3.2_2.13-0.14.0.jar:?]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59) ~[iceberg-spark-runtime-3.2_2.13-0.14.0.jar:?]
at <a href="http://org.apache.spark.scheduler.AsyncEventQueue.org">org.apache.spark.scheduler.AsyncEventQueue.org</a>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) ~[spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1433) [spark-core_2.13-3.2.4.jar:3.2.4]
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) [spark-core_2.13-3.2.4.jar:3.2.4]
Caused by: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused)
at io.openlineage.spark.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.shaded.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.shaded.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:123) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
... 20 more
*Thread Reply:* I'm honestly wondering if this is the source of my troubles with the integration tests failing. i.e., the mockserver just isn't binding to the port, or the port isn't accepting connections, or something.
*Thread Reply:* That, for example, comes from the Google Cloud Integration test
*Thread Reply:* > Basically, if the OpenLineage Java client detects that you're trying to post to /api/v1/namespaces/<namespace>
I don't think it sends to .../namespaces
anywhere by default, the canonical endpoint is api/v1/lineage
?
> I'm honestly wondering if this is the source of my troubles with the integration tests failing. i.e., the mockserver just isn't binding to the port, or the port isn't accepting connections, or something.
Or the tests run before mockserver started accepting connections?
*Thread Reply:* In Docker tests, we have a long timeout on the container
static MockServerContainer makeMockServerContainer(Network network) {
return new MockServerContainer(MOCKSERVER_IMAGE)
.withNetwork(network)
.withNetworkAliases("openlineageclient")
.withStartupTimeout(Duration.of(2, ChronoUnit.MINUTES));
}
*Thread Reply:* but I don't think there's anything like that on local tests?
mockServer = ClientAndServer.startClientAndServer(configuration, MOCKSERVER_PORT);
mockServer
.when(request("/api/v1/lineage"))
.respond(org.mockserver.model.HttpResponse.response().withStatusCode(201));
*Thread Reply:* @Maciej Obuchowski - possible, possible. Nice thought.
*Thread Reply:* Ran your branch locally and I'm getting the same errors
*Thread Reply:* What OS and arch are you using?
*Thread Reply:* I'm using an Apple M1 with MacOS Sonoma
*Thread Reply:* pretty much the same, M1 Pro with Sonoma
*Thread Reply:* What JDK are you running the project with?
*Thread Reply:* Azul Zulu 8 Arm build
*Thread Reply:* I guess I just have some magic on my machine
*Thread Reply:*
➜ code java -version
openjdk version "1.8.0_312"
OpenJDK Runtime Environment (Zulu 8.58.0.13-CA-macos-aarch64) (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM (Zulu 8.58.0.13-CA-macos-aarch64) (build 25.312-b07, mixed mode)
*Thread Reply:* I'll try to dig in more
*Thread Reply:* I have 392, but I don't think it makes much of a difference
*Thread Reply:* Also, I have access to a centos box running on x86_64, which I am running the tests with.
*Thread Reply:* To see if I can replicate the failures there
*Thread Reply:* 2024-02-14 18:26:32 ERROR io.openlineage.spark.agent.EventEmitter - Could not emit lineage w/ exception
io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused)
at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:128) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?]
at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.end(SparkSQLExecutionContext.java:141) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?]
at io.openlineage.spark.agent.OpenLineageSparkListener.sparkSQLExecEnd(OpenLineageSparkListener.java:100) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?]
at io.openlineage.spark.agent.OpenLineageSparkListener.onOtherEvent(OpenLineageSparkListener.java:86) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:?]
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) ~[scala-library-2.13.11.jar:?]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59) ~[scala-library-2.13.11.jar:?]
at <a href="http://org.apache.spark.scheduler.AsyncEventQueue.org">org.apache.spark.scheduler.AsyncEventQueue.org</a>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) ~[spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471) [spark-core_2.13-3.4.2.jar:3.4.2]
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) [spark-core_2.13-3.4.2.jar:3.4.2]
Caused by: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused)
Got it on the linux box
*Thread Reply:* ```parkDeltaIntegrationTest STANDARD_OUT 2024-02-14 18:26:07 WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:32801) could not be established. Broker may not be available. 2024-02-14 18:26:08 INFO io.openlineage.spark.agent.SparkDeltaIntegrationTest - Waiting for mock server to start on port 1082
SparkDeltaIntegrationTest > testMergeInto() STANDARDOUT 2024-02-14 18:26:08 ERROR io.openlineage.spark.agent.EventEmitter - Could not emit lineage w/ exception io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1] failed: Connection refused (Connection refused) at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:128) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationEvent(OpenLineageSparkListener.java:321) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationStartEvent(OpenLineageSparkListener.java:325) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.onApplicationStart(OpenLineageSparkListener.java:247) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:55) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) ~[spark-core2.13-3.4.2.jar:3.4.2] at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) ~[scala-library-2.13.11.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59) ~[scala-library-2.13.11.jar:?] at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471) [spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) [spark-core2.13-3.4.2.jar:3.4.2]```
*Thread Reply:* So, the fact that the test proceeds, means that the mock server has started.
*Thread Reply:* Because I have this
``` @Override public void beforeAll(ExtensionContext context) throws Exception { int port = basePort.getAndIncrement(); Configuration configuration = new Configuration(); configuration.logLevel(Level.ERROR); clientAndServer = ClientAndServer.startClientAndServer(configuration, port); clientAndServer .when(request("/api/v1/lineage")) .respond(HttpResponse.response().withStatusCode(201));
if (context.getTestClass().isPresent()) {
Logger logger = LoggerFactory.getLogger(context.getTestClass().get());
Awaitility.await("wait-for-mock-server-start-up")
.atMost(Duration.ofMinutes(2))
.pollInterval(Duration.ofSeconds(1))
.until(
() -> {
<a href="http://logger.info">logger.info</a>(
"Waiting for mock server to start on port {}", clientAndServer.getPort());
return clientAndServer.isRunning();
});
}
}```
*Thread Reply:* So there's another reason why the connection is being refused.
*Thread Reply:* https://github.com/mock-server/mockserver/issues/498
*Thread Reply:* What also has me confused, is that I see a lot of Kafka Admin Client related configurations.
*Thread Reply:* Being unable to connect to a broker. Which is super strange.
*Thread Reply:* I would try to see if this happens on the containers test in particular
*Thread Reply:* With the container test, I see that some of the tests are showing 404's
*Thread Reply:* Further digging:
If I run one test class in isolation, the tests go through.
*Thread Reply:* However, if I run multiple tests that require mock server, bad things happen.
*Thread Reply:* i.e., the connection refused errors appear
*Thread Reply:* is this related to a shared mock server or shared spark session?
*Thread Reply:* Are you asking about shared within the test class, or shared across test classes?
*Thread Reply:* shared across test classes
*Thread Reply:* In that case, neither the spark session nor the mock server are shared.
*Thread Reply:* For example, SparkDeltaIntegrationTest and SparkIcebergIntergrationTest
*Thread Reply:* They each get their own servers and sessions
*Thread Reply:* However, if you run them in isolation, they will individually pass
*Thread Reply:* But, if you run them in parallel or sequentially (but in the same JVM process), the one that executes after will fail.
*Thread Reply:* that's weird. They can still share derby location or /tmp/whatver
where data is stored
*Thread Reply:* For example: ./gradlew integrationTest -x test -x buildDockerImage -Pscala.binary.version=2.13 -Pspark.version=3.4.2 --tests SparkDeltaIntegrationTest --tests SparkGenericIntegrationTest --tests SparkIcebergIntegrationTest
*Thread Reply:* I started seeing the connection refused error, around the 2nd last test of the SparkDeltaIntegrationTest
*Thread Reply:* which makes me wonder if MockServer only wants 1 instance per JVM process
*Thread Reply:* Or are we hitting heapsize limits
*Thread Reply:* i.e., the JVM is running out of resources and that's why we're seeing connection refused
*Thread Reply:* mockserver is way to heavy for what it's doing. that's why maciej trully hates it
*Thread Reply:* you can reduce paralellism to 1 and see if tests suceed. this will take ages but at least we would this
*Thread Reply:* I think I already did that
*Thread Reply:* Fork every and max parallel
*Thread Reply:* Rnning the test with jvmArgs("-Xmx4096m", "-XX:MaxPermSize=512m")
now
*Thread Reply:* OK. So I did something completely the opposite
*Thread Reply:* I increased the parallelism
*Thread Reply:* I said, max parallel forks = 5
*Thread Reply:* I had to change port assignment
*Thread Reply:* i run the command locally and have a lot of io.openlineage.client.OpenLineageClientException: <a href="http://io.openlineage.spark.shaded.org">io.openlineage.spark.shaded.org</a>.apache.http.conn.HttpHostConnectException: Connect to localhost:1081 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
*Thread Reply:* sth is stopping mockserver while other test tries to connect it
*Thread Reply:* I think MockServer isn't cleaning up after itself properly
*Thread Reply:* SUCCESS: Executed 26 tests in 1m 31s
*Thread Reply:* With:
tasks.register("integrationTest", Test.class) {
group = "verification"
dependsOn(integrationTestDependencies)
testClassesDirs = testSourceSet.output.classesDirs
classpath = files(tasks.shadowJar.outputs.files.singleFile, sourceSets.test.runtimeClasspath)
useJUnitPlatform {
includeTags("integration-test")
excludeTags("databricks")
logger.warn("[IntegrationTest] hasDeltaDependencies: ${hasDeltaDependencies(spark, scala)}")
if (!hasDeltaDependencies(spark, scala)) {
logger.warn("[IntegrationTest] Excluding delta tests")
excludeTags("delta")
}
logger.warn("[IntegrationTest] hasIcebergDependencies: ${hasIcebergDependencies(spark, scala)}")
if (!hasIcebergDependencies(spark, scala)) {
logger.warn("[IntegrationTest] Excluding iceberg tests")
excludeTags("iceberg")
}
}
systemProperties.put("test.results.dir", buildDirectory.dir("test-results/${name}/o").get().asFile.absolutePath)
setForkEvery(1)
maxParallelForks = 5
setMaxHeapSize("1024m")
}
*Thread Reply:* I get this:
SUCCESS: Executed 26 tests in 1m 32s
*Thread Reply:* I am also seeing java.lang.ClassCastException: org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier cannot be cast to org.apache.spark.sql.catalyst.analysis.ResolvedTable
which don't make tests fail
*Thread Reply:* Yeah - that's something to do with Delta casting
*Thread Reply:* The code is trying to cast table to an identifier
*Thread Reply:* hmyy, i thought resetting mockServer also clears the expectations
*Thread Reply:* and this
.when(request("/api/v1/lineage"))
.respond(HttpResponse.response().withStatusCode(201))
is also an expectation
*Thread Reply:* If it was an expectation
*Thread Reply:* wouldn't it clear after the first test?
*Thread Reply:* ``` 2024-02-14 20:38:02 INFO io.openlineage.spark.agent.SparkDeltaIntegrationTest - Stopping mockserver on port 1923
2024-02-14 20:38:02 WARN org.apache.spark.sql.SparkSession - An existing Spark session exists as the active or default session. This probably means another suite leaked it. Attempting to stop it before continuing. This existing Spark session was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:1035) io.openlineage.spark.agent.SparkDeltaIntegrationTest.beforeEach(SparkDeltaIntegrationTest.java:111) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128) org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeEachMethod(TimeoutExtension.java:78) org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
2024-02-14 20:38:02 INFO io.openlineage.spark.agent.SparkDeltaIntegrationTest - Stopping mockserver on port 1923
SparkGenericIntegrationTest STANDARD_OUT 2024-02-14 20:38:03 INFO io.openlineage.spark.agent.SparkGenericIntegrationTest - Waiting for mock server to start on port 1930
SparkGenericIntegrationTest > sparkEmitsApplicationLevelEvents() STANDARDOUT 2024-02-14 20:38:03 ERROR io.openlineage.spark.agent.EventEmitter - Could not emit lineage w/ exception io.openlineage.client.OpenLineageClientException: io.openlineage.spark.shaded.org.apache.http.NoHttpResponseException: localhost:1923 failed to respond at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:128) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:60) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT] at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationEvent(OpenLineageSparkListener.java:321) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.emitApplicationStartEvent(OpenLineageSparkListener.java:325) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at io.openlineage.spark.agent.OpenLineageSparkListener.onApplicationStart(OpenLineageSparkListener.java:247) ~[openlineage-spark-agent2.13-1.9.0-SNAPSHOT-shadow.jar:?] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:55) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) ~[spark-core2.13-3.4.2.jar:3.4.2] at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) ~[scala-library-2.13.11.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59) ~[scala-library-2.13.11.jar:?] at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) ~[spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471) [spark-core2.13-3.4.2.jar:3.4.2] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) [spark-core2.13-3.4.2.jar:3.4.2] Caused by: io.openlineage.spark.shaded.org.apache.http.NoHttpResponseException: localhost:1923 failed to respond``` I added log to notify mockServer is stopped. I see it being stopped and then something tries to connect it.
*Thread Reply:* Hmm - I find that really strange, that SparkGenericIntegrationTest, is trying to connect to SparkDeltaIntegrationTestt's mock server
*Thread Reply:* It should have a new mock server injected
*Thread Reply:* Unless that atomic integer isn't working as expected
*Thread Reply:* first, I changed atomicInteger to random but this did not help
*Thread Reply:* Does this mean, MockServer is keeping state across tests?
*Thread Reply:* ```State can be cleared from MockServer selectively:
use type to select which type of state to clear, supported values are: all, log, expectations use a request matcher to clear matching items use an expectation id to clear matching items```
*Thread Reply:* new MockServerClient("localhost", 1080).clear(
request(),
ClearType.LOG
);
*Thread Reply:* Using the official MockServerExtension
*Thread Reply:* Results in the same behaviour.
*Thread Reply:* SparkDeltaIntegrationTest completes, and then SparkGenericIntegrationTest starts
*Thread Reply:* SparkGenericIntegrationTest is trying to connect to the same port as SparkDeltaIntegrationTest
*Thread Reply:* OK. Using a single JVM process
*Thread Reply:* I have managed to get the tests to run
*Thread Reply:* without them failing due to connection refused
*Thread Reply:* however, they are failing due to request matching
*Thread Reply:* I really don't think mockserver is thread safe
*Thread Reply:* This is what I've done to the tests:
```import org.mockserver.junit.jupiter.MockServerExtension; import org.mockserver.junit.jupiter.MockServerSettings;
@ExtendWith(MockServerExtension.class) @MockServerSettings(ports = {SparkIcebergIntegrationTest.MOCKSERVERPORT}, perTestSuite = true) public class SparkIcebergIntegrationTest { public static final int MOCKSERVERPORT = 1084;
public SparkIcebergIntegrationTest(ClientAndServer mockServer) { this.mockServer = mockServer; this.mockServer .when(request("/api/v1/lineage")) .respond(org.mockserver.model.HttpResponse.response().withStatusCode(201)); }
public void beforeEach() { mockServer.clear(request(), ClearType.LOG);
// stuff } }```
*Thread Reply:* Pretty crazy though, that if you fork. Things work.
*Thread Reply:* Btw, to get the official extension:
testImplementation("org.mock-server:mockserver_netty:5.14.0:shaded") {
exclude group: 'com.google.guava', module: 'guava'
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.datatype'
exclude group: 'com.fasterxml.jackson.dataformat'
exclude group: 'org.mock-server.mockserver-client-java'
}
testImplementation("org.mock-server:mockserver_junit_jupiter_no_dependencies:5.14.0")
*Thread Reply:* OK. I am happy to say that we've gotten the failures down to the same tests across all variants.
*Thread Reply:* ```io.openlineage.spark.agent.GoogleCloudIntegrationTest > testRddWriteToBucket() > testReadAndWriteFromBigquery()
io.openlineage.spark.agent.lifecycle.SparkReadWriteIntegTest > testExternalRDDWithS3Bucket(SparkSession)```
*Thread Reply:* I will say, the last one (the class version error) is the one that I'm unsure how to solve.
*Thread Reply:* Perhaps something is bringing in the JDK 11 variant? Maybe we try and exclude the variant from the classpath?
*Thread Reply:* org.glassfish.jaxb:jaxb-runtime
- yeah ... this one is the reason for the failure with the S3 test.
I removed spark-mllib
during the migration, because I couldn't find any references to it. AS it turns out, the Google BigQuery stuff requires it. So I added it back, without excluding glassfish.
*Thread Reply:* I think I got It 😉
Why are you removing OpenLineageSparkListener
setting? This made the GoogleCloudIntegrationTest
succeed on my end.
*Thread Reply:* I remember I was rearranging the args, to make sure the OL stuff was grouped together.
*Thread Reply:* Thanks for your help @Paweł Leszczyński
*Thread Reply:* Hmm - testExternalRddWithS3Bucket
fails with:
Caused by: java.io.FileNotFoundException: No such file or directory: <s3a://openlineage-test/rdd_a_3.3.4/_temporary/0/task_202402150933081796269000648724168_0000_m_000001/part-00001-b5762fba-cb9a-401d-9f14-73cda8c6706c-c000.snappy.parquet>
*Thread Reply:* 2.12 integration tests are failing because of 404 on mockserver
*Thread Reply:* I just realised why the bigquery tests are failing
*Thread Reply:* Not because of the code, but because the fixtures are no longer valid
*Thread Reply:* "name": "openlineage-ci.airflow_integration.{spark_version}_target"
must be changed to "name": "openlineage-ci.airflow_integration.{spark_version}_{scala}_target"
*Thread Reply:* Probably for the MetaStore2 and MetaStore3 events
*Thread Reply:* Regarding the 404:
This one didn't experience those failures, but it is 3.5.0, so it may not have the same tests running
*Thread Reply:* https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9322/workflows/d03fc2fb-9a74-446d-83e8-cc69fb798dde/jobs/175611/tests
Same with this one. It is only the bigquery test failing.
*Thread Reply:* https://app.circleci.com/pipelines/github/OpenLineage/OpenLineage/9322/workflows/d03fc2fb-9a74-446d-83e8-cc69fb798dde/jobs/175614/tests
This one is different though
*Thread Reply:* pls run spotless, so we could see what's still failing
*Thread Reply:* I am off next week and would love to merge it this week 😉
*Thread Reply:* I need to recall what has Metastore test to do with google storage.
*Thread Reply:* That's the failing test for now
*Thread Reply:* It uses it for storage of the files
*Thread Reply:* does it expect some content of the storage initially?
*Thread Reply:* i don't get why it's suceeding for 3.5.0
*Thread Reply:* It succeeds for 3.4.2 and 3.5.0.
*Thread Reply:* And the 2.12 variant of 3.3.4
*Thread Reply:* logs are also flooded with io.openlineage.client.OpenLineageClientException: code: 404, response:
*Thread Reply:* THat's for the container tests
*Thread Reply:* And I see that I haven't changed the mockserver container at all
*Thread Reply:* if 2.12 tests and 2.13 refer to the same warehouse path maybe they overwrite each other's data?
*Thread Reply:* ➜ ~ gcloud storage ls <gs://openlineage-ci-testing/warehouse>
<gs://openlineage-ci-testing/warehouse/3.1.3/>
<gs://openlineage-ci-testing/warehouse/3.2.2/>
<gs://openlineage-ci-testing/warehouse/3.2.4/>
<gs://openlineage-ci-testing/warehouse/3.3.0/>
<gs://openlineage-ci-testing/warehouse/3.3.1/>
<gs://openlineage-ci-testing/warehouse/3.3.2/>
<gs://openlineage-ci-testing/warehouse/3.3.3/>
<gs://openlineage-ci-testing/warehouse/3.3.4/>
<gs://openlineage-ci-testing/warehouse/3.4.0/>
<gs://openlineage-ci-testing/warehouse/3.4.1/>
<gs://openlineage-ci-testing/warehouse/3.4.2/>
<gs://openlineage-ci-testing/warehouse/3.5.0/>
<gs://openlineage-ci-testing/warehouse/no_iceberg_test.db/>
<gs://openlineage-ci-testing/warehouse/spark-3_2_4/>
<gs://openlineage-ci-testing/warehouse/spark-3_3_4/>
<gs://openlineage-ci-testing/warehouse/spark-3_4_2/>
<gs://openlineage-ci-testing/warehouse/spark-3_5_0/>
I don't think your tests created any new paths there
*Thread Reply:* also the errors indicate that
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2) (ip-10-0-109-117.ec2.internal executor driver): org.apache.iceberg.exceptions.RuntimeIOException: Failed to get status for file: <gs://openlineage-ci-testing/warehouse/3.3.4/hive3/test/data/00000-0-b84bfcf2-3c44-47c6-9da9-c6e420db237c-00001.parquet>
*Thread Reply:* there's no scala version in the path
*Thread Reply:* java.io.IOException: Path: //openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_13/hive3/test_3_3_4_2_13/data is a directory, which is not supported by the record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.
*Thread Reply:* org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: <gs://openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_13/hive3/test_3_3_4_2_13/metadata/00001-2862ac34-7a06-4b6a-8c13-b07446745975.metadata.json>
*Thread Reply:* ah it's has the underscores now
*Thread Reply:* ➜ ~ gcloud storage ls <gs://openlineage-ci-testing/warehouse/spark-3_3_4/>
<gs://openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_12/>
<gs://openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_13/>
*Thread Reply:* Yeah - I created my own GCP project
*Thread Reply:* I see, I was looking at the earlier failures
*Thread Reply:* Just to test this locally
*Thread Reply:* https://github.com/OpenLineage/OpenLineage/pull/2432/files#diff-137aa17091138b69681510e13e3b7d66aa9c9c7c81fe8fe13f09f0de76448dd5R48 -> this is using same port as mockserver container is using
*Thread Reply:* I looked at todays ci build on main and there are no 404
errors
*Thread Reply:* Caused by: java.io.FileNotFoundException: Item not found: 'gs://****************************-testing/warehouse/spark-3_3_4/scala-2_13/hive3/test_3_3_4_2_13/metadata/00001-0906d19b-1c0b-4e62-9cd5-b35a5410b0d8.metadata.json'. Note, it is possible that the live version is still available but the requested generation is deleted.
this sounds to me like there is some concurrency
*Thread Reply:* it gets retried 4 times
*Thread Reply:* so chances are low
*Thread Reply:* to me it looks like something is not able to clear the path
*Thread Reply:* but why would it clear properly for other versions
*Thread Reply:* yeah so maybe it's red herring, it might fail at
2024-02-15 12:23:38 ERROR EventEmitter - Could not emit lineage w/ exception
io.openlineage.client.OpenLineageClientException: code: 404, response:
at io.openlineage.client.transports.HttpTransport.throwOnHttpError(HttpTransport.java:150) ~[openlineage-spark-agent_2.13-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
and then there is something leftover?
*Thread Reply:* to me 404 are results of two tests using the same mockserver and resetting it
*Thread Reply:* yes, it might fail on that, then it retries and then fails earlier because of storage leftovers?
*Thread Reply:* all the more reasons to get rid of mockserver 🙂
*Thread Reply:* this would make sense assuming retry wouldn't do @BeforeAll
nor @AfterAll
when failing
*Thread Reply:* Dude ... I wrote a "FileOutputExtension" to replace MockServer
*Thread Reply:* and then I deleted it
*Thread Reply:* I started something few weeks ago https://github.com/OpenLineage/OpenLineage/commit/604f35ee66c164550b2b08c87a043d5a0beed62d
*Thread Reply:* @Damien Hawes could we change port of ColumnLineageIntegrationTest
from 1080 to something else
*Thread Reply:* yes, as mockserver container is not using this
*Thread Reply:* @Maciej Obuchowski - yeah, I was using the file transport and then used ObjectMapper + JSONAssert from org.skyscreamer to perform a weak match on the expected and actual json
*Thread Reply:* what a pity @Maciej Obuchowski;)
*Thread Reply:* yeah would be best as a separate PR 🙂
*Thread Reply:* maybe randomize the mockserver port?
*Thread Reply:* 1/65536 is pretty low chance of collision
*Thread Reply:* mockserver hates running in the same thread
*Thread Reply:* one day we will rewrite it to rust, scala, github actions, non-mocksercer whatever but for now let's just juggle mockserver ports to make s.it work 😉
*Thread Reply:* I never want to touch Scala again
*Thread Reply:* I actually wish we've started all of this in Scala, we'd not have to do any of it
*Thread Reply:* But even like 2 years ago we've thought it's too much work, and now we have 5x more code and related problems
*Thread Reply:* Sunken cost fallacy
*Thread Reply:* WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/bitnami/spark/jars/spark-unsafe_2.12-3.2.4.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Hmm, has bitnami upgraded the JDK on their images?
*Thread Reply:* @Paweł Leszczyński - when I run the column lineage test, in isolation, on my local
*Thread Reply:* 2024-02-15 15:07:56 WARN NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-02-15 15:07:58 ERROR EventEmitter - Could not emit lineage w/ exception
io.openlineage.client.OpenLineageClientException: code: 404, response:
at io.openlineage.client.transports.HttpTransport.throwOnHttpError(HttpTransport.java:150) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:124) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:111) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:1.9.0-SNAPSHOT]
at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:69) ~[openlineage-spark-agent_2.12-1.9.0-SNAPSHOT-shadow.jar:?]
*Thread Reply:* The illegal reflective stuff should not actually break anything right now afaik
*Thread Reply:* @Paweł Leszczyński @Maciej Obuchowski
.when(request("/api/v1/lineage"))
.respond(org.mockserver.model.HttpResponse.response().withStatusCode(201))
However, when I examine debug logs and the code I see:
2024-02-15 15:11:12 INFO IcebergHandler - {spark.openlineage.transport.url=<http://localhost:1090/api/v1/namespaces/default>, ....
.config(
"spark.openlineage.transport.url",
"<http://localhost>:" + MOCKSERVER_PORT + "/api/v1/namespaces/default")
*Thread Reply:* Ah, so you think it emits to the wrong port?
*Thread Reply:* No. To the wrong resource
*Thread Reply:* Oh yes, why it is namespaces endpoint?
*Thread Reply:* Yeah, just wondering
*Thread Reply:* ``` public ColumnLineageIntegrationTest(ClientAndServer mockServer) { this.mockServer = mockServer; mockServer .when(request("/api/v1/lineage")) .respond(org.mockserver.model.HttpResponse.response().withStatusCode(201)); }
@SneakyThrows
@BeforeAll
public static void setup() {
METASTORECONTAINER.start();
mappedPort = METASTORECONTAINER.getMappedPort(MetastoreTestUtils.POSTGRESPORT);
spark = getSparkSession();
Arrays.asList("v2source1", "v2source_2")
.forEach(e -> spark.sql("drop table if exists " + e));
getIcebergTable(spark, 1);
getIcebergTable(spark, 2);
databaseUrl = String.format("jdbc:
*Thread Reply:* The 404's are coming from the @BeforeAll lifecycle method
*Thread Reply:* MockServer hasn't been created or configured by then. MockServer is instance scoped, but the method is static scope.
*Thread Reply:* So it's possible that via the extension, MockServer is instantiated and is listening on that port, but no expectations have been configured.
*Thread Reply:* So MockServer is like, "404 everything"
*Thread Reply:* yes, that's possible
*Thread Reply:* Oh - regarding the Scala conversation. It would require a change to the architecture of the solution.
We'd probably have to follow the route of the spline folks. Namely: 1 artifact per spark x scala version. In our case, we'd have to consider producing a Scala 2.11 build just for 2.4.8.
(I know this entire conversation is entirely hypothetical)
*Thread Reply:* 1. SparkContainerIntegrationTest
is throwing 404s.
*Thread Reply:* I think any large effort like this would be more likely to target moving some of the code to Spark itself, or to enhance it's APIs to make developing this kind of integration easier
*Thread Reply:* More thoughts @Paweł Leszczyński @Maciej Obuchowski:
*Thread Reply:* But I'm still at a loss for this:
java.io.IOException: Path: //openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_12/hive3/test_3_3_4_2_12/data is a directory, which is not supported by the record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.
*Thread Reply:* Oh wait, I see metastore 3 has an @Tag("iceberg") on its unit test
*Thread Reply:* Spark 3.5.0 won't run it
*Thread Reply:* java.io.IOException: Path: //openlineage-ci-testing/warehouse/spark-3_3_4/scala-2_12/hive3/test_3_3_4_2_12/data is a directory, which is not supported by the record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.
that confuses me too
*Thread Reply:* What's the point of the Metastore tests?
*Thread Reply:* What are we testing there?
*Thread Reply:* interaction with datasets registered and accessed through Hive Metastore
*Thread Reply:* I mean, we're not evening loading the OpenLineageSparkListener
*Thread Reply:* I mean, this is what we're testing:
void IcebergTablesTest() {
executeSql("create database if not exists %s", database);
executeSql("drop table if exists %s.%s", database, table);
executeSql(
"create external table %s.%s (id int, value string) USING iceberg location '%s'",
database, table, MetastoreTestUtils.getTableLocation(database, table));
executeSql("insert into table %s.%s VALUES (1, 'value1'), (2, 'value2')", database, table);
Dataset<Row> rowDataset = executeSql(String.format("select ** from %s.%s", database, table));
List<Row> rows = rowDataset.collectAsList();
assertThat(rows).hasSize(2);
assertThat(rows.get(0).get(0)).isEqualTo(1);
}
*Thread Reply:* This is what the Spark Conf looks like:
``` public static SparkConf getCommonSparkConf(
String appName, String metastoreName, int mappedPort, Boolean isIceberg) {
SparkConf conf =
new SparkConf()
.setAppName(appName + VERSION)
.setMaster("local[**]")
.set("spark.driver.host", LOCALIP)
.set("org.jpox.autoCreateSchema", "true")
.set(
"javax.jdo.option.ConnectionURL",
String.format("jdbc:
if (isIceberg) {
conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hive")
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
}
return conf;
}```
*Thread Reply:* So we're testing whether the hive metastore works on GCS?
*Thread Reply:* Which confuses me, because shouldn't that test be done in Hive's test suite and not ours?
*Thread Reply:* Unless there's a good reason for these tests, that don't actually test our code, I'm going to disable the tests. Do you agree @Paweł Leszczyński @Maciej Obuchowski?
*Thread Reply:* I feel this is unfinished feature
*Thread Reply:* I feel like we can disable it
*Thread Reply:* but still don't understand why it fails, maybe it will always fail in 2.13?
*Thread Reply:* If I test it locally, under 3.2.4 and 2.13, it passes.
*Thread Reply:* this looks like unfinished feature with a first PR to introduce spark+real hive metastore environment and other PRs never happened later. i don't know why it's not clearing the path. this can be related to a changed hadoop client dependencies. I think I don't mind turning test off and creating a separate issue to fix it later.
*Thread Reply:* All integration tests for Spark pass
*Thread Reply:* with 65 commits squashed?
*Thread Reply:* I was just about to squash them
*Thread Reply:* Including commits like, "Hello Spotless my old friend"
*Thread Reply:* We have squash on merge set up
*Thread Reply:* I wanted to summarise my changes
*Thread Reply:* Let's fix this by adding changelog entry?
*Thread Reply:* btw @Paweł Leszczyński - I had an idea of how to reduce the duplication in the CI config
*Thread Reply:* we can use YAML references/pointers
*Thread Reply:* And add a parameter for scala binary version
*Thread Reply:* I liked the idea of mocksever extension
*Thread Reply:* it got reverted, perhaps this was too many improvements at once but this was cool
*Thread Reply:* I can reintroduce it, because they publish their own extension
*Thread Reply:* which I used, after deleting my own
*Thread Reply:* But the tests do need to be adjusted though
*Thread Reply:* so that we don't rely on anything in static space
*Thread Reply:* My desired solution would be to write events to file with file
transport and then verify events from that with no need for extra servers and ports. Maciej would prefer some lightweight server container to have a generic solution unrelated if it is java or python.
*Thread Reply:* Yeah - I want to go the file solution as well
*Thread Reply:* let's see who will be the first 😉
*Thread Reply:* It's less resource consumption
*Thread Reply:* It took me an hour to write it.
*Thread Reply:* Including the verification
*Thread Reply:* does it require to modify all the existing tests?
*Thread Reply:* It basically looked the exact same as the current mock server verifyEvents method
*Thread Reply:* The only difference was instead of passing in a mock server instance
*Thread Reply:* I passed in the file location (java.nio.file.Path)
*Thread Reply:* > does it require to modify all the existing tests? Regardless of which ever approach you take, you will have to modify the existing tests.
*Thread Reply:* yes, but do we need to modify assertion files with expected OL events
*Thread Reply:* If I were to redo it though, I would write it as an extension
*Thread Reply:* You mean with replacements?
*Thread Reply:* Because if yes, I already did that 😓
*Thread Reply:* can we reuse those files?
*Thread Reply:* As I said, the only thing that changed in the method
*Thread Reply:* was the type of the first parameter
*Thread Reply:* it sounds cool to me
*Thread Reply:* ClientAndServer -> java.nio.file.Path
*Thread Reply:* Yeah but I deleted the code, because I was like "Too many changes"
*Thread Reply:* but it won't be hard to add it back
*Thread Reply:* I wouldn't miss mockserver at all 😉
*Thread Reply:* unless I didn't commit it
*Thread Reply:* This is what I had @Paweł Leszczyński
``` private static final ObjectReader READER = new ObjectMapper().findAndRegisterModules().reader();
static void verifyEvents(Path filePath, String... eventFiles) { verifyEvents(filePath, ignored -> true, Collections.emptyMap(), eventFiles); }
static void verifyEvents(Path filePath, Predicate<JsonNode> eventFilter, String... eventFiles) { verifyEvents(filePath, eventFilter, Collections.emptyMap(), eventFiles); }
static void verifyEvents(Path filePath, Map<String, String> replacements, String... eventFiles) { verifyEvents(filePath, ignored -> true, replacements, eventFiles); }
@SneakyThrows static List<OpenLineage.RunEvent> getEmittedEvents(Path filePath) { List<String> lines = Files.readAllLines(filePath); return lines.stream() .map(OpenLineageClientUtils::runEventFromJson) .collect(Collectors.toList()); }
@SneakyThrows static void verifyEvents( Path filePath, Predicate<JsonNode> eventFilter, Map<String, String> replacements, String... expectedEventFileNames) { Path eventsDir = Paths.get("integrations/container/");
final List<JsonNode> actualJsonObjects =
Files.readAllLines(filePath).stream()
.map(MockServerUtils::sneakyRead)
.filter(eventFilter)
.collect(Collectors.toList());
final List<JsonNode> expectedJsonObjects =
Arrays.stream(expectedEventFileNames)
.map(eventsDir::resolve)
.map(MockServerUtils::sneakyRead)
.map(json -> doReplace(json, replacements))
.map(MockServerUtils::sneakyRead)
.collect(Collectors.toList());
IntStream.range(0, expectedJsonObjects.size())
.forEach(
index -> {
JsonNode actualJson = actualJsonObjects.get(index);
JsonNode expectedJson = expectedJsonObjects.get(index);
try {
<a href="http://log.info">log.info</a>("Comparing {} to {}", expectedJson, actualJson);
JSONAssert.assertEquals(expectedJson.toString(), actualJson.toString(), false);
} catch (JSONException e) {
Assertions.fail("Failed to match JSON", e);
}
});
}
@SneakyThrows private static JsonNode sneakyRead(String json) { return MockServerUtils.READER.readTree(json); }
@SneakyThrows private static String sneakyRead(Path path) { return new String(Files.readAllBytes(path)); }
private static String doReplace(String s, Map<String, String> replacements) { String result = s; for (Map.Entry<String, String> entry : replacements.entrySet()) { result = result.replace(entry.getKey(), entry.getValue()); } return result; }```
*Thread Reply:* Why was the predicate needed? Because MockServer
can match certain events, for example, we only asked it to match on start and complete events, however the listener would publish running events.
*Thread Reply:* So the predicate can be used to omit running events before the comparison takes place
*Thread Reply:* you need to make sure you're not comparing stuff like timestamps or UUIDs that are generated for each run
*Thread Reply:* That's this line:
JSONAssert.assertEquals(expectedJson.toString(), actualJson.toString(), false);
*Thread Reply:* On Airflow side we have solution with jinja templates, that's why I wanted to reuse that code
*Thread Reply:* The "false" means, don't do strict matching
Hey team! There is a high probability that we would need an expansion to the Kafka transport to support IAM authentication in MSK.
First, can I put it in the open lineage repository, or will we create an internal transport? Then, do you prefer some additional functionality at the already existing Kafka transport or a new one that extends the Kafka one? In the second one, we can separate the dependencies.
*Thread Reply:* We'd prefer to not put additional dependencies as large as
import boto3
import botocore.session
to generic client that everyone using OpenLineage will download
*Thread Reply:* So what about a openlineage-python[msk]
?
*Thread Reply:* I’m ok with adding it as extra dependency with extending the existing Kafka transport
*Thread Reply:* I'm okay with openlineage-python[msk]
too
@Paweł Leszczyński @Maciej Obuchowski - can I get this one reviewed: https://github.com/OpenLineage/OpenLineage/pull/2446
*Thread Reply:* I will do it. @Paweł Leszczyński is on vacation this week, so expect slower response time unfortunately
*Thread Reply:* Ah right, I remember now.
*Thread Reply:* https://github.com/OpenLineage/docs/pull/286
@Damien Hawes @Mattia Bertorello is this ⬆️ and Iceberg PR enough for 2.13 support?
*Thread Reply:* I believe so
Hey! I would like to discuss this solution about DBT wrapper https://github.com/OpenLineage/OpenLineage/pull/2449 A stakeholder is concern about a possibile failure that can occur when emit the event from DBT event if the job is successful.
Do you think we can make of catching any exception a default one or we need to put a "feature flag" with an environment variable?
*Thread Reply:* I think we can make it default
*Thread Reply:* It's not "breaking change", if anything it makes it more resilient
*Thread Reply:* I would change the
<a href="http://logger.info">logger.info</a>(f"Emitted {len(events) + 2} openlineage events")
to accurately count emitted events and display failed ones too
*Thread Reply:* Obviously, in this case, it could happen that the job is successful, and there is no lineage.
*Thread Reply:* Done, I've done a emitted_events
counter
*Thread Reply:* Hi Maciej, Could we merge this PR #2449 before release the 1.9.0? Do I still need to work on something?
*Thread Reply:* probably yes, sorry for not merging it earlier
*Thread Reply:* @Mattia Bertorello it's merged 🙂
Hey team!
Another discussion: I created an MSK transport; let me know what you think. With this transport, OL users can use MSK with IAM authentication without defining a custom transport.
https://github.com/OpenLineage/OpenLineage/pull/2478
*Thread Reply:* would be great if you could confirm that you tested this manually
*Thread Reply:* I test it. I can show some screenshots 🙂 I have to create a small Python script, ship everything in a docker container and run it in a machine with network connectivity to MSK 😅
*Thread Reply:* I believe you, it's just it would be too expensive time wise to have real integration tests for each of those transports, so we have to rely on people manually testing it 🙂
*Thread Reply:* Yeah you need an AWS account, some terraform code to create and destroy the MSK plus the integration test to run inside the VPC network 😅
But It's makes sense to put some screenshot in the PR just to show that was tested and how.
*Thread Reply:* The only test is the IAM auth because other than that is normal Kafka
*Thread Reply:* test code ```import datetime import uuid
from openlineage.client import OpenLineageClient from openlineage.client.run import Job, Run, RunEvent, RunState from openlineage.client.transport import MSKIAMTransport from openlineage.client.transport.msk_iam import MSKIAMConfig
if name == "main": import logging
logging.basicConfig(level=logging.DEBUG)
config = MSKIAMConfig(
config={
"bootstrap.servers": "b-2.xxx.c2.kafka.eu-west-2.amazonaws.com:9098,b_1.xxx.c2.kafka.eu_west_2.amazonaws.com:9098"
},
topic="my_test_topic",
region="eu-west-2",
flush=True,
)
transport = MSKIAMTransport(config)
client = OpenLineageClient(transport=transport)
event = RunEvent(
eventType=RunState.START,
eventTime=datetime.datetime.now().isoformat(),
run=Run(runId=str(uuid.uuid4())),
job=Job(namespace="kafka", name="test"),
producer="prod",
schemaURL="schema/RunEvent",
)
client.emit(event)
client.transport.producer.flush(timeout=1)
print("Messages sent")```
Logs
DEBUG:openlineage.client.transport.kafka:BRKMAIN [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Enter main broker thread
2024-02-29T12:14:47.560285672Z DEBUG:openlineage.client.transport.kafka:CONNECT [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Received CONNECT op
2024-02-29T12:14:47.560288447Z DEBUG:openlineage.client.transport.kafka:STATE [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Broker changed state INIT -> TRY_CONNECT
2024-02-29T12:14:47.560291862Z DEBUG:openlineage.client.transport.kafka:BROADCAST [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: Broadcasting state change
2024-02-29T12:14:47.560294645Z DEBUG:openlineage.client.transport.kafka:TOPIC [rdkafka#producer-1] [thrd:app]: New local topic: my_test_topic
2024-02-29T12:14:47.560297342Z DEBUG:openlineage.client.transport.kafka:TOPPARNEW [rdkafka#producer-1] [thrd:app]: NEW my_test_topic [-1] 0x5598e047bbf0 refcnt 0x5598e047bc80 (at rd_kafka_topic_new0:472)
2024_02_29T12:14:47.560300475Z DEBUG:openlineage.client.transport.kafka:BRKMAIN [rdkafka#producer-1] [thrd:app]: Waking up waiting broker threads after setting OAUTHBEARER token
2024-02-29T12:14:47.560303259Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Wake-up: OAUTHBEARER token update
2024-02-29T12:14:47.560306334Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: Wake-up sent to 1 broker thread in state >= TRY_CONNECT: OAUTHBEARER token update
2024-02-29T12:14:47.560309239Z DEBUG:openlineage.client.transport.kafka:CONNECT [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: broker in state TRY_CONNECT connecting
2024-02-29T12:14:47.560312101Z DEBUG:openlineage.client.transport.kafka:STATE [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap>: Broker changed state TRY_CONNECT -> CONNECT
...
DEBUG:openlineage.client.transport.kafka:PRODUCE [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: my_test_topic [0]: Produce MessageSet with 1 message(s) (349 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
2024-02-29T12:14:48.326364842Z DEBUG:openlineage.client.transport.kafka:SEND [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: Sent ProduceRequest (v7, 454 bytes @ 0, CorrId 5)
2024-02-29T12:14:48.382471756Z DEBUG:openlineage.client.transport.kafka:RECV [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: Received ProduceResponse (v7, 102 bytes, CorrId 5, rtt 55.99ms)
2024-02-29T12:14:48.382517219Z DEBUG:openlineage.client.transport.kafka:MSGSET [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: my_test_topic [0]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
2024-02-29T12:14:48.382623532Z DEBUG:openlineage.client.transport.kafka:Send message <cimpl.Message object at 0x7fb116fcde40>
2024-02-29T12:14:48.382648622Z DEBUG:openlineage.client.transport.kafka:Amount of messages left in Kafka buffers after flush 0
2024-02-29T12:14:48.382730647Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: sasl_<ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1>: Wake-up: flushing
2024-02-29T12:14:48.382747018Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: Wake-up sent to 1 broker thread in state >= UP: flushing
2024-02-29T12:14:48.382752798Z Messages sent
*Thread Reply:* I copied from the Kafka transport https://github.com/OpenLineage/OpenLineage/pull/2478#discussion_r1507361123 and It makes sense because otherwise when python read all the file could import a library that doesn't exist in case you don't need it.
*Thread Reply:* Also It think it's better to drop the support for IMDSv1 and in any case I should implement the IMDSv2 😅 to be complete https://github.com/OpenLineage/OpenLineage/pull/2478#discussion_r1507359486
*Thread Reply:* Hi @Kacper Muda, Is there still something to do in this PR?
@Manish Kumar Gupta has joined the channel
Hi all, please let me know if spark 3.5.1 is supported by OpenLineage, and with jar version for io.openlineage:openlineage-spark should be used in pyspark. Thanks.
@Rishabh Pareek has joined the channel
@Paweł Leszczyński @Maciej Obuchowski - I am going to archive this channel, as it has served its purpose. Yes?
@Kürşat Topçuoğlu has joined the channel