diff --git a/connector/kinesis-asl/pom.xml b/connector/kinesis-asl/pom.xml index 1b2986681956b..f6f4f9eb7ec61 100644 --- a/connector/kinesis-asl/pom.xml +++ b/connector/kinesis-asl/pom.xml @@ -64,10 +64,16 @@ ${aws.java.sdk.version} - com.amazonaws + software.amazon.kinesis amazon-kinesis-producer ${aws.kinesis.producer.version} test + + + com.kjetland + mbknor-jackson-jsonschema_2.12 + + diff --git a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala index 58b64ba11d35d..6b06f197a137d 100644 --- a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala +++ b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala @@ -18,15 +18,14 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import java.util.concurrent.{Executors, TimeUnit} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, - KinesisProducerConfiguration, UserRecordResult} import com.google.common.util.concurrent.{FutureCallback, Futures} - -import org.apache.spark.util.ThreadUtils +import software.amazon.kinesis.producer.{KinesisProducer => KPLProducer, + KinesisProducerConfiguration, UserRecordResult} private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2) extends KinesisTestUtils(streamShardCount) { @@ -53,6 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG } override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = { + val executor = Executors.newSingleThreadExecutor() val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() data.foreach { num => val str = num.toString @@ -63,15 +63,17 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG override def onSuccess(result: UserRecordResult): Unit = { val shardId = result.getShardId - val seqNumber = result.getSequenceNumber() + val seqNumber = result.getSequenceNumber val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, new ArrayBuffer[(Int, String)]()) sentSeqNumbers += ((num, seqNumber)) } } - Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService()) + Futures.addCallback(future, kinesisCallBack, executor) } producer.flushSync() - shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq) + executor.shutdown() + executor.awaitTermination(10, TimeUnit.SECONDS) + shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2)) } } diff --git a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 652822c5fdc97..ca5fc30a65d5a 100644 --- a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient} import com.amazonaws.services.kinesis.model._ +import com.amazonaws.waiters.WaiterParameters import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME} @@ -61,6 +62,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi client } + private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists() + private lazy val dynamoDB = { val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()) dynamoDBClient.setRegion(RegionUtils.getRegion(regionName)) @@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi } private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = { - val startTimeNs = System.nanoTime() - while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) { - Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds)) - describeStream(streamNameToWaitFor).foreach { description => - val streamStatus = description.getStreamStatus() - logDebug(s"\t- current state: $streamStatus\n") - if ("ACTIVE".equals(streamStatus)) { - return - } - } - } - require(false, s"Stream $streamName never became active") + val describeStreamRequest = new DescribeStreamRequest() + .withStreamName(streamNameToWaitFor) + streamExistsWaiter.run(new WaiterParameters(describeStreamRequest)) } } diff --git a/pom.xml b/pom.xml index 01ce893bb1880..b6c23bfbeadc9 100644 --- a/pom.xml +++ b/pom.xml @@ -155,12 +155,12 @@ 4.2.30 1.12.0 - 1.12.0 + 1.15.3 - 1.11.655 + 1.12.681 2.25.53 - 0.12.8 + 1.0.5 hadoop3-2.2.26