Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion connector/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@
<version>${aws.java.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>${aws.kinesis.producer.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.kjetland</groupId>
<artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- manage this up explicitly to match Spark; com.amazonaws:aws-java-sdk-pom specifies
2.6.7 but says we can manage it up -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package org.apache.spark.streaming.kinesis

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer,
KinesisProducerConfiguration, UserRecordResult}
import com.google.common.util.concurrent.{FutureCallback, Futures}

import org.apache.spark.util.ThreadUtils
import software.amazon.kinesis.producer.{KinesisProducer => KPLProducer,
KinesisProducerConfiguration, UserRecordResult}

private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2)
extends KinesisTestUtils(streamShardCount) {
Expand All @@ -53,6 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
}

override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
val executor = Executors.newSingleThreadExecutor()
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
Expand All @@ -63,15 +63,17 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG

override def onSuccess(result: UserRecordResult): Unit = {
val shardId = result.getShardId
val seqNumber = result.getSequenceNumber()
val seqNumber = result.getSequenceNumber
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
new ArrayBuffer[(Int, String)]())
sentSeqNumbers += ((num, seqNumber))
}
}
Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService())
Futures.addCallback(future, kinesisCallBack, executor)
}
producer.flushSync()
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import com.amazonaws.services.kinesis.model._
import com.amazonaws.waiters.WaiterParameters

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME}
Expand Down Expand Up @@ -61,6 +62,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
client
}

private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists()

private lazy val dynamoDB = {
val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
Expand Down Expand Up @@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
}

private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
val startTimeNs = System.nanoTime()
while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
describeStream(streamNameToWaitFor).foreach { description =>
val streamStatus = description.getStreamStatus()
logDebug(s"\t- current state: $streamStatus\n")
if ("ACTIVE".equals(streamStatus)) {
return
}
}
}
require(false, s"Stream $streamName never became active")
val describeStreamRequest = new DescribeStreamRequest()
.withStreamName(streamNameToWaitFor)
streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
}
}

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@
<codahale.metrics.version>4.2.30</codahale.metrics.version>
<!-- Should be consistent with SparkBuild.scala and docs -->
<avro.version>1.12.0</avro.version>
<aws.kinesis.client.version>1.12.0</aws.kinesis.client.version>
<aws.kinesis.client.version>1.15.3</aws.kinesis.client.version>
<!-- Should be consistent with Kinesis client dependency -->
<aws.java.sdk.version>1.11.655</aws.java.sdk.version>
<aws.java.sdk.version>1.12.681</aws.java.sdk.version>
<aws.java.sdk.v2.version>2.25.53</aws.java.sdk.v2.version>
<!-- the producer is used in tests -->
<aws.kinesis.producer.version>0.12.8</aws.kinesis.producer.version>
<aws.kinesis.producer.version>1.0.5</aws.kinesis.producer.version>
<!-- Do not use 3.0.0: https://github.com/GoogleCloudDataproc/hadoop-connectors/issues/1114 -->
<gcs-connector.version>hadoop3-2.2.26</gcs-connector.version>
<!-- org.apache.httpcomponents/httpclient-->
Expand Down