Skip to content

Conversation

vrozov
Copy link
Member

@vrozov vrozov commented Oct 15, 2025

What changes were proposed in this pull request?

Upgrade kinesis client and AWS Java SDK to fix Kinesis integration tests.

Kinesis client is upgraded from 1.12.0 to 1.15.3 (latest on 1.x)
AWS Java SDK is upgraded from 1.11.655 to 1.12.681 (the one used by Kinesis client)
AWS Kinesis producer library (used in test) upgraded from 0.12.8 to 1.0.5

Why are the changes needed?

Existing clients are not compatible causing Kinesis integration tests to fail at runtime:

ENABLE_KINESIS_TESTS=1 ./build/sbt -Pkinesis-asl
...
Using endpoint URL https://kinesis.us-west-2.amazonaws.com for creating Kinesis streams for tests.
[info] WithoutAggregationKinesisBackedBlockRDDSuite:
[info] org.apache.spark.streaming.kinesis.WithoutAggregationKinesisBackedBlockRDDSuite *** ABORTED *** (1 second, 131 milliseconds)
[info]   java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/PropertyNamingStrategy$PascalCaseStrategy
[info]   at com.amazonaws.services.kinesis.AmazonKinesisClient.<clinit>(AmazonKinesisClient.java:86)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient$lzycompute(KinesisTestUtils.scala:59)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient(KinesisTestUtils.scala:58)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.describeStream(KinesisTestUtils.scala:169)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.findNonExistentStreamName(KinesisTestUtils.scala:182)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.createStream(KinesisTestUtils.scala:85)
[info]   at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.$anonfun$beforeAll$1(KinesisBackedBlockRDDSuite.scala:45)
[info]   at org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled(KinesisFunSuite.scala:41)
[info]   at org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled$(KinesisFunSuite.scala:39)
[info]   at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.runIfTestsEnabled(KinesisBackedBlockRDDSuite.scala:26)
[info]   at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.beforeAll(KinesisBackedBlockRDDSuite.scala:43)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info]   Cause: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.PropertyNamingStrategy$PascalCaseStrategy
[info]   at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
[info]   at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
[info]   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
[info]   at com.amazonaws.services.kinesis.AmazonKinesisClient.<clinit>(AmazonKinesisClient.java:86)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient$lzycompute(KinesisTestUtils.scala:59)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient(KinesisTestUtils.scala:58)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.describeStream(KinesisTestUtils.scala:169)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.findNonExistentStreamName(KinesisTestUtils.scala:182)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.createStream(KinesisTestUtils.scala:85)
[info]   at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.$anonfun$beforeAll$1(KinesisBackedBlockRDDSuite.scala:45)
[info]   at org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled(KinesisFunSuite.scala:41)
[info]   at org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled$(KinesisFunSuite.scala:39)
[info]   at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.runIfTestsEnabled(KinesisBackedBlockRDDSuite.scala:26)
[info]   at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.beforeAll(KinesisBackedBlockRDDSuite.scala:43)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[error] Uncaught exception when running org.apache.spark.streaming.kinesis.WithoutAggregationKinesisBackedBlockRDDSuite: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/PropertyNamingStrategy$PascalCaseStrategy

Does this PR introduce any user-facing change?

No, only minor version upgrade for the Kinesis and AWS Java SDK libraries

How was this patch tested?

ENABLE_KINESIS_TESTS=1 ./build/sbt -Pkinesis-asl

and

ENABLE_KINESIS_TESTS=1 build/mvn test -Pkinesis-asl -pl connector/kinesis-asl

Was this patch authored or co-authored using generative AI tooling?

No

@vrozov
Copy link
Member Author

vrozov commented Oct 15, 2025

@dongjoon-hyun Please take a look

require(false, s"Stream $streamName never became active")
val describeStreamRequest = new DescribeStreamRequest()
.withStreamName(streamNameToWaitFor)
streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is easier to use waiters to wait for the stream to become active. Additionally, using existing approach intermittently caused ResourceNotFoundException.

shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test assumes that head corresponds to the earliest sequence number and the tail to the latest one while order in which listener callback are called do not provide such invariant.

}
producer.flushSync()
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
executor.shutdown()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not sufficient to flush producer, it is necessary to wait for all callbacks to be processed. Previously they were called on the producer execution pool and required synchronization (that was missing).

}

override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
val executor = Executors.newSingleThreadExecutor()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single thread executor helps to avoid synchronization in callbacks.

@dongjoon-hyun
Copy link
Member

Nice, @vrozov . I believe you are able to test this in AWS environment. Did I understand correctly?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much. I'm curious if we can unify the following three to AWS SDK v2. In v2, Kinesis client is available, isn't it, @vrozov and @sarutak .

    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>amazon-kinesis-client</artifactId>
      <version>${aws.kinesis.client.version}</version>
    </dependency>
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-java-sdk-sts</artifactId>
      <version>${aws.java.sdk.version}</version>
    </dependency>
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>amazon-kinesis-producer</artifactId>
      <version>${aws.kinesis.producer.version}</version>
      <scope>test</scope>
    </dependency>

@vrozov vrozov changed the title [SPARK-53927][KINESIS] Upgrade kinesis client and fix kinesis integration tests ((ENABLE_KINESIS_TESTS=1) [SPARK-53927][KINESIS] Upgrade kinesis client and fix kinesis integration tests (ENABLE_KINESIS_TESTS=1) Oct 16, 2025
@vrozov
Copy link
Member Author

vrozov commented Oct 16, 2025

I believe you are able to test this in AWS environment. Did I understand correctly?

@dongjoon-hyun I tested it using my AWS Kinesis deployment. It is required when ENABLE_KINESIS_TESTS is set to 1.

@vrozov
Copy link
Member Author

vrozov commented Oct 16, 2025

@dongjoon-hyun amazon-kinesis-client is a wrapper on top of AWS Java SDK, it is still a separate library. See https://github.com/awslabs/amazon-kinesis-client. Similar for amazon-kinesis-producer https://github.com/awslabs/amazon-kinesis-producer. I suggest to keep amazon-kinesis-client at 1.x (for any backward compatibility issues) and see later how it can be upgraded to the latest version (possibly using different profiles).

@dongjoon-hyun
Copy link
Member

Well, I want to suggest to remove this technically from Apache Spark code base.

    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-java-sdk-sts</artifactId>
      <version>${aws.java.sdk.version}</version>
    </dependency>

@vrozov
Copy link
Member Author

vrozov commented Oct 16, 2025

sts is not a Kinesis library, it is used for authentication (Security Token). The library is explicitly used by Spark Kinesis connector (not in unused declared dependency):

[INFO] --- dependency:3.8.1:analyze (default-cli) @ spark-streaming-kinesis-asl_2.13 ---
[WARNING] Used undeclared dependencies found:
[WARNING]    org.apache.spark:spark-core_2.13:jar:4.1.0-SNAPSHOT:compile
[WARNING]    com.google.guava:guava:jar:33.4.0-jre:provided
[WARNING]    org.junit.jupiter:junit-jupiter-api:jar:6.0.0:test
[WARNING]    org.apache.logging.log4j:log4j-core:jar:2.24.3:compile
[WARNING]    org.scalatest:scalatest-funsuite_2.13:jar:3.2.19:test
[WARNING]    org.apache.spark:spark-common-utils_2.13:jar:4.1.0-SNAPSHOT:compile
[WARNING]    org.scalatest:scalatest-shouldmatchers_2.13:jar:3.2.19:test
[WARNING]    org.scalactic:scalactic_2.13:jar:3.2.19:test
[WARNING]    com.amazonaws:aws-java-sdk-kinesis:jar:1.11.655:compile
[WARNING]    org.scalatest:scalatest-compatible:jar:3.2.19:test
[WARNING]    org.apache.spark:spark-common-utils-java_2.13:jar:4.1.0-SNAPSHOT:compile
[WARNING]    org.slf4j:slf4j-api:jar:2.0.17:compile
[WARNING]    com.amazonaws:aws-java-sdk-dynamodb:jar:1.11.655:compile
[WARNING]    org.scalatest:scalatest-core_2.13:jar:3.2.19:test
[WARNING]    org.scalatest:scalatest-matchers-core_2.13:jar:3.2.19:test
[WARNING]    org.apache.logging.log4j:log4j-api:jar:2.24.3:compile
[WARNING]    org.scalatest:scalatest-mustmatchers_2.13:jar:3.2.19:test
[WARNING]    org.scala-lang:scala-library:jar:2.13.17:compile
[WARNING]    com.amazonaws:aws-java-sdk-core:jar:1.11.655:compile
[WARNING] Unused declared dependencies found:
[WARNING]    com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.19.2:compile
[WARNING]    net.bytebuddy:byte-buddy:jar:1.17.6:test
[WARNING]    net.bytebuddy:byte-buddy-agent:jar:1.17.6:test
[WARNING]    org.scalacheck:scalacheck_2.13:jar:1.18.0:test
[WARNING]    org.apache.spark:spark-tags_2.13:jar:4.1.0-SNAPSHOT:compile
[WARNING]    org.apache.spark:spark-tags_2.13:test-jar:tests:4.1.0-SNAPSHOT:test
[WARNING]    org.spark-project.spark:unused:jar:1.0.0:compile
[WARNING]    org.scalatest:scalatest_2.13:jar:3.2.19:test
[WARNING]    org.scalatestplus:scalacheck-1-18_2.13:jar:3.2.19.0:test
[WARNING]    org.scalatestplus:selenium-4-21_2.13:jar:3.2.19.0:test
[WARNING]    org.junit.jupiter:junit-jupiter:jar:6.0.0:test
[WARNING]    com.github.sbt.junit:jupiter-interface:jar:0.17.0:test
[WARNING] Non-test scoped test only dependencies found:
[WARNING]    com.amazonaws:aws-java-sdk-dynamodb:jar:1.11.655:compile

@sarutak
Copy link
Member

sarutak commented Oct 16, 2025

@dongjoon-hyun

Well, I want to suggest to remove this technically from Apache Spark code base.

Do you mean whether we can remove replace com.amazonaws:aws-java-sdk-sts with software.amazon.awssdk:sts which is in v2 library and remove the property aws.java.sdk.version from pom.xml?

Copy link
Member

@sarutak sarutak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR seems to include:

  1. Dependency upgrade to fix an integration test issue which is explained in the title and description
  2. Refactoring, which is not relevant to the issue

I feel it better to separate those two changes to independent PRs to make git-history clean.

@vrozov
Copy link
Member Author

vrozov commented Oct 16, 2025

@sarutak The PR includes only changes required for tests to pass. Dependency upgrade is not sufficient as explained in the PR comments.

Copy link
Member

@sarutak sarutak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirmed all the changes are necessary to fix the issue and all kinesis tests pass on my dev environment.
LGTM except for indentation.

@dongjoon-hyun
Copy link
Member

To @sarutak and @vrozov , for AWS SDK v2, I was thinking about this previous PR which is better than this. IIRC, there exists another PR to upgrade this module to AWS SDK v2, too.

However, I'll leave this to you folks because I cannot help testing this module personally. If you are sure, you can merge this, @sarutak .

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-53927][KINESIS] Upgrade kinesis client and fix kinesis integration tests (ENABLE_KINESIS_TESTS=1) [SPARK-53927][KINESIS] Upgrade kinesis client and fix kinesis integration tests Oct 16, 2025
@vrozov
Copy link
Member Author

vrozov commented Oct 16, 2025

@dongjoon-hyun I'd suggest to move forward with this PR that uses Kinesis client 1.x (1.15.3) and requires AWS Java SDK v1. IMO, upgrading to AWS Java SDK v2 is a larger effort and I do plan to look into it later:

I suggest to keep amazon-kinesis-client at 1.x (for any backward compatibility issues) and see later how it can be upgraded to the latest version (possibly using different profiles).

@vrozov
Copy link
Member Author

vrozov commented Oct 16, 2025

To reviewers, code changes were necessary only in tests and upgrading dependencies fixed java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/PropertyNamingStrategy$PascalCaseStrategy

@vrozov vrozov changed the title [SPARK-53927][KINESIS] Upgrade kinesis client and fix kinesis integration tests [SPARK-53927][BUILD][DSTREAM] Upgrade kinesis client and fix kinesis integration tests Oct 16, 2025
@sarutak
Copy link
Member

sarutak commented Oct 17, 2025

I'm OK to use Kinesis client 1.x to focus on the test failure this time.

@sarutak sarutak closed this in 136201a Oct 17, 2025
@sarutak
Copy link
Member

sarutak commented Oct 17, 2025

Merged to master. Thank you @vrozov and @dongjoon-hyun !

@vrozov
Copy link
Member Author

vrozov commented Oct 17, 2025

@dongjoon-hyun @sarutak The issue is not specific to the test, the entire Kinesis integration is broken due to conflicting dependencies. I'll check 4.0 and backport if the error is reproducible on 4.0 branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants