diff --git a/connector/kinesis-asl/pom.xml b/connector/kinesis-asl/pom.xml
index 1b2986681956b..f6f4f9eb7ec61 100644
--- a/connector/kinesis-asl/pom.xml
+++ b/connector/kinesis-asl/pom.xml
@@ -64,10 +64,16 @@
${aws.java.sdk.version}
- com.amazonaws
+ software.amazon.kinesis
amazon-kinesis-producer
${aws.kinesis.producer.version}
test
+
+
+ com.kjetland
+ mbknor-jackson-jsonschema_2.12
+
+
diff --git a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
index 58b64ba11d35d..6b06f197a137d 100644
--- a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++ b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -18,15 +18,14 @@ package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
+import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer,
- KinesisProducerConfiguration, UserRecordResult}
import com.google.common.util.concurrent.{FutureCallback, Futures}
-
-import org.apache.spark.util.ThreadUtils
+import software.amazon.kinesis.producer.{KinesisProducer => KPLProducer,
+ KinesisProducerConfiguration, UserRecordResult}
private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2)
extends KinesisTestUtils(streamShardCount) {
@@ -53,6 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
}
override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+ val executor = Executors.newSingleThreadExecutor()
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
@@ -63,15 +63,17 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
override def onSuccess(result: UserRecordResult): Unit = {
val shardId = result.getShardId
- val seqNumber = result.getSequenceNumber()
+ val seqNumber = result.getSequenceNumber
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
new ArrayBuffer[(Int, String)]())
sentSeqNumbers += ((num, seqNumber))
}
}
- Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService())
+ Futures.addCallback(future, kinesisCallBack, executor)
}
producer.flushSync()
- shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
+ executor.shutdown()
+ executor.awaitTermination(10, TimeUnit.SECONDS)
+ shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2))
}
}
diff --git a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 652822c5fdc97..ca5fc30a65d5a 100644
--- a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import com.amazonaws.services.kinesis.model._
+import com.amazonaws.waiters.WaiterParameters
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME}
@@ -61,6 +62,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
client
}
+ private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists()
+
private lazy val dynamoDB = {
val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
@@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
}
private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
- val startTimeNs = System.nanoTime()
- while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
- Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
- describeStream(streamNameToWaitFor).foreach { description =>
- val streamStatus = description.getStreamStatus()
- logDebug(s"\t- current state: $streamStatus\n")
- if ("ACTIVE".equals(streamStatus)) {
- return
- }
- }
- }
- require(false, s"Stream $streamName never became active")
+ val describeStreamRequest = new DescribeStreamRequest()
+ .withStreamName(streamNameToWaitFor)
+ streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
}
}
diff --git a/pom.xml b/pom.xml
index 01ce893bb1880..b6c23bfbeadc9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,12 +155,12 @@
4.2.30
1.12.0
- 1.12.0
+ 1.15.3
- 1.11.655
+ 1.12.681
2.25.53
- 0.12.8
+ 1.0.5
hadoop3-2.2.26