-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53927][BUILD][DSTREAM] Upgrade kinesis client and fix kinesis integration tests #52630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,14 @@ package org.apache.spark.streaming.kinesis | |
|
||
import java.nio.ByteBuffer | ||
import java.nio.charset.StandardCharsets | ||
import java.util.concurrent.{Executors, TimeUnit} | ||
|
||
import scala.collection.mutable | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, | ||
KinesisProducerConfiguration, UserRecordResult} | ||
import com.google.common.util.concurrent.{FutureCallback, Futures} | ||
|
||
import org.apache.spark.util.ThreadUtils | ||
import software.amazon.kinesis.producer.{KinesisProducer => KPLProducer, | ||
KinesisProducerConfiguration, UserRecordResult} | ||
|
||
private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2) | ||
extends KinesisTestUtils(streamShardCount) { | ||
|
@@ -53,6 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG | |
} | ||
|
||
override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = { | ||
val executor = Executors.newSingleThreadExecutor() | ||
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() | ||
data.foreach { num => | ||
val str = num.toString | ||
|
@@ -63,15 +63,17 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG | |
|
||
override def onSuccess(result: UserRecordResult): Unit = { | ||
val shardId = result.getShardId | ||
val seqNumber = result.getSequenceNumber() | ||
val seqNumber = result.getSequenceNumber | ||
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, | ||
new ArrayBuffer[(Int, String)]()) | ||
sentSeqNumbers += ((num, seqNumber)) | ||
} | ||
} | ||
Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService()) | ||
Futures.addCallback(future, kinesisCallBack, executor) | ||
} | ||
producer.flushSync() | ||
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq) | ||
executor.shutdown() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not sufficient to flush producer, it is necessary to wait for all callbacks to be processed. Previously they were called on the producer execution pool and required synchronization (that was missing). |
||
executor.awaitTermination(10, TimeUnit.SECONDS) | ||
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test assumes that head corresponds to the earliest sequence number and the tail to the latest one while order in which listener callback are called do not provide such invariant. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient | |
import com.amazonaws.services.dynamodbv2.document.DynamoDB | ||
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient} | ||
import com.amazonaws.services.kinesis.model._ | ||
import com.amazonaws.waiters.WaiterParameters | ||
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME} | ||
|
@@ -61,6 +62,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi | |
client | ||
} | ||
|
||
private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists() | ||
|
||
private lazy val dynamoDB = { | ||
val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()) | ||
dynamoDBClient.setRegion(RegionUtils.getRegion(regionName)) | ||
|
@@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi | |
} | ||
|
||
private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = { | ||
val startTimeNs = System.nanoTime() | ||
while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) { | ||
Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds)) | ||
describeStream(streamNameToWaitFor).foreach { description => | ||
val streamStatus = description.getStreamStatus() | ||
logDebug(s"\t- current state: $streamStatus\n") | ||
if ("ACTIVE".equals(streamStatus)) { | ||
return | ||
} | ||
} | ||
} | ||
require(false, s"Stream $streamName never became active") | ||
val describeStreamRequest = new DescribeStreamRequest() | ||
.withStreamName(streamNameToWaitFor) | ||
streamExistsWaiter.run(new WaiterParameters(describeStreamRequest)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is easier to use waiters to wait for the stream to become active. Additionally, using existing approach intermittently caused |
||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single thread executor helps to avoid synchronization in callbacks.