Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion connector/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@
<version>${aws.java.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>${aws.kinesis.producer.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.kjetland</groupId>
<artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- manage this up explicitly to match Spark; com.amazonaws:aws-java-sdk-pom specifies
2.6.7 but says we can manage it up -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package org.apache.spark.streaming.kinesis

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer,
KinesisProducerConfiguration, UserRecordResult}
import com.google.common.util.concurrent.{FutureCallback, Futures}

import org.apache.spark.util.ThreadUtils
import software.amazon.kinesis.producer.{KinesisProducer => KPLProducer,
KinesisProducerConfiguration, UserRecordResult}

private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2)
extends KinesisTestUtils(streamShardCount) {
Expand All @@ -53,6 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
}

override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
val executor = Executors.newSingleThreadExecutor()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single thread executor helps to avoid synchronization in callbacks.

val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
Expand All @@ -63,15 +63,17 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG

override def onSuccess(result: UserRecordResult): Unit = {
val shardId = result.getShardId
val seqNumber = result.getSequenceNumber()
val seqNumber = result.getSequenceNumber
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
new ArrayBuffer[(Int, String)]())
sentSeqNumbers += ((num, seqNumber))
}
}
Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService())
Futures.addCallback(future, kinesisCallBack, executor)
}
producer.flushSync()
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
executor.shutdown()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not sufficient to flush producer, it is necessary to wait for all callbacks to be processed. Previously they were called on the producer execution pool and required synchronization (that was missing).

executor.awaitTermination(10, TimeUnit.SECONDS)
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test assumes that head corresponds to the earliest sequence number and the tail to the latest one while order in which listener callback are called do not provide such invariant.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import com.amazonaws.services.kinesis.model._
import com.amazonaws.waiters.WaiterParameters

import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME}
Expand Down Expand Up @@ -61,6 +62,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
client
}

private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists()

private lazy val dynamoDB = {
val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
Expand Down Expand Up @@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
}

private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
val startTimeNs = System.nanoTime()
while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
describeStream(streamNameToWaitFor).foreach { description =>
val streamStatus = description.getStreamStatus()
logDebug(s"\t- current state: $streamStatus\n")
if ("ACTIVE".equals(streamStatus)) {
return
}
}
}
require(false, s"Stream $streamName never became active")
val describeStreamRequest = new DescribeStreamRequest()
.withStreamName(streamNameToWaitFor)
streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is easier to use waiters to wait for the stream to become active. Additionally, using existing approach intermittently caused ResourceNotFoundException.

}
}

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@
<codahale.metrics.version>4.2.33</codahale.metrics.version>
<!-- Should be consistent with SparkBuild.scala and docs -->
<avro.version>1.12.0</avro.version>
<aws.kinesis.client.version>1.12.0</aws.kinesis.client.version>
<aws.kinesis.client.version>1.15.3</aws.kinesis.client.version>
<!-- Should be consistent with Kinesis client dependency -->
<aws.java.sdk.version>1.11.655</aws.java.sdk.version>
<aws.java.sdk.version>1.12.681</aws.java.sdk.version>
<aws.java.sdk.v2.version>2.29.52</aws.java.sdk.v2.version>
<!-- the producer is used in tests -->
<aws.kinesis.producer.version>0.12.8</aws.kinesis.producer.version>
<aws.kinesis.producer.version>1.0.5</aws.kinesis.producer.version>
<!-- Do not use 3.0.0: https://github.com/GoogleCloudDataproc/hadoop-connectors/issues/1114 -->
<gcs-connector.version>hadoop3-2.2.28</gcs-connector.version>
<analyticsaccelerator-s3.version>1.3.0</analyticsaccelerator-s3.version>
Expand Down