-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53927][BUILD][DSTREAM] Upgrade kinesis client and fix kinesis integration tests #52630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@dongjoon-hyun Please take a look |
require(false, s"Stream $streamName never became active") | ||
val describeStreamRequest = new DescribeStreamRequest() | ||
.withStreamName(streamNameToWaitFor) | ||
streamExistsWaiter.run(new WaiterParameters(describeStreamRequest)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is easier to use waiters to wait for the stream to become active. Additionally, using existing approach intermittently caused ResourceNotFoundException
.
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq) | ||
executor.shutdown() | ||
executor.awaitTermination(10, TimeUnit.SECONDS) | ||
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test assumes that head corresponds to the earliest sequence number and the tail to the latest one while order in which listener callback are called do not provide such invariant.
} | ||
producer.flushSync() | ||
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq) | ||
executor.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not sufficient to flush producer, it is necessary to wait for all callbacks to be processed. Previously they were called on the producer execution pool and required synchronization (that was missing).
} | ||
|
||
override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = { | ||
val executor = Executors.newSingleThreadExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single thread executor helps to avoid synchronization in callbacks.
Nice, @vrozov . I believe you are able to test this in AWS environment. Did I understand correctly? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much. I'm curious if we can unify the following three to AWS SDK v2
. In v2
, Kinesis client is available, isn't it, @vrozov and @sarutak .
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws.kinesis.client.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws.java.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>${aws.kinesis.producer.version}</version>
<scope>test</scope>
</dependency>
@dongjoon-hyun I tested it using my AWS Kinesis deployment. It is required when |
@dongjoon-hyun |
Well, I want to suggest to remove this technically from Apache Spark code base.
|
sts is not a Kinesis library, it is used for authentication (Security Token). The library is explicitly used by Spark Kinesis connector (not in unused declared dependency):
|
Do you mean whether we can remove replace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR seems to include:
- Dependency upgrade to fix an integration test issue which is explained in the title and description
- Refactoring, which is not relevant to the issue
I feel it better to separate those two changes to independent PRs to make git-history clean.
@sarutak The PR includes only changes required for tests to pass. Dependency upgrade is not sufficient as explained in the PR comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I confirmed all the changes are necessary to fix the issue and all kinesis tests pass on my dev environment.
LGTM except for indentation.
To @sarutak and @vrozov , for AWS SDK v2, I was thinking about this previous PR which is better than this. IIRC, there exists another PR to upgrade this module to AWS SDK v2, too. However, I'll leave this to you folks because I cannot help testing this module personally. If you are sure, you can merge this, @sarutak . |
…integration tests (ENABLE_KINESIS_TESTS=1)
@dongjoon-hyun I'd suggest to move forward with this PR that uses Kinesis client 1.x (1.15.3) and requires AWS Java SDK v1. IMO, upgrading to AWS Java SDK v2 is a larger effort and I do plan to look into it later:
|
To reviewers, code changes were necessary only in tests and upgrading dependencies fixed |
I'm OK to use Kinesis client 1.x to focus on the test failure this time. |
Merged to |
@dongjoon-hyun @sarutak The issue is not specific to the test, the entire Kinesis integration is broken due to conflicting dependencies. I'll check 4.0 and backport if the error is reproducible on 4.0 branch. |
What changes were proposed in this pull request?
Upgrade kinesis client and AWS Java SDK to fix Kinesis integration tests.
Kinesis client is upgraded from
1.12.0
to1.15.3
(latest on 1.x)AWS Java SDK is upgraded from
1.11.655
to1.12.681
(the one used by Kinesis client)AWS Kinesis producer library (used in test) upgraded from
0.12.8
to1.0.5
Why are the changes needed?
Existing clients are not compatible causing Kinesis integration tests to fail at runtime:
Does this PR introduce any user-facing change?
No, only minor version upgrade for the Kinesis and AWS Java SDK libraries
How was this patch tested?
and
Was this patch authored or co-authored using generative AI tooling?
No