Skip to content
This repository was archived by the owner on Oct 23, 2023. It is now read-only.

Commit f6aaeb7

Browse files
aquamatthiasmarkglh
authored andcommitted
Add an Akka Stream Sink graph stage for Kinesis. (#47)
Add a `KinesisSinkGraphStage` that uses an underlying `KinesisProducerActor` to do the heavy lifting. The stream manages back pressure by allowing only a fixed number of outstanding messages. A materialized value is used to indicate when a stream has been finished (or failed).
1 parent 6ac885e commit f6aaeb7

File tree

10 files changed

+542
-15
lines changed

10 files changed

+542
-15
lines changed

README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
2323
* [Usage: Producer](#usage-usage-producer)
2424
* [Actor Based Implementation](#usage-usage-producer-actor-based-implementation)
2525
* [Pure Scala based implementation (simple wrapper around KPL)](#usage-usage-producer-pure-scala-based-implementation-simple-wrapper-around-kpl)
26+
* [Akka Stream Sink](#akka-stream-sink)
2627
* [Running the reliability test](#running-the-reliability-test)
2728
* [Delete & recreate kinesisstreams and dynamo table](#running-the-reliability-test-delete-recreate-kinesisstreams-and-dynamo-table)
2829
* [Running the producer-consumer test](#running-the-reliability-test-running-the-producer-consumer-test)
@@ -484,6 +485,55 @@ callback onFailure {
484485
}
485486
```
486487

488+
<a name="akka-stream-sink"></a>
489+
### Akka Stream Sink
490+
491+
An Akka `Sink` is provided which can be used to publish messages via streams.
492+
Every message is sent as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload.
493+
The `Sink` is created from a `ProducerConf` or directly with a `KinesisProducerActor`. See [Kinesis](https://github.com/WW-Digital/reactive-kinesis/blob/master/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala) for the various options.
494+
495+
The `Sink` expects an acknowledgement for every message sent to Kinesis.
496+
An amount of unacknowledged messages can be configured, before back pressure is applied.
497+
This throttling is controlled by the kinesis.{producer}.akka.max-outstanding-requests configuration value.
498+
Please note: a default value (1000 messages) is applied, if throttling is not configured.
499+
500+
The provided `Sink` produces a `Future[Done]` as materialized value.
501+
This future succeeds, if all messages from upstream are sent to Kinesis and acknowledged.
502+
It fails if a message could not be send to Kinesis or upstream fails.
503+
504+
```scala
505+
import akka.stream.scaladsl.Source
506+
import com.weightwatchers.reactive.kinesis.models._
507+
import com.weightwatchers.reactive.kinesis.stream._
508+
509+
Source(1.to(100).map(_.toString))
510+
.map(num => ProducerEvent(num, num))
511+
.runWith(Kinesis.sink("producer-name"))
512+
.onComplete {
513+
case Success(_) => println("All messages are published successfully.")
514+
case Failure(ex) => println(s"Failed to publish messages: ${ex.getMessage}")
515+
}
516+
```
517+
518+
A long running flow can be easily achieved using a `SourceQueue`.
519+
In this case the flow stays open as long as needed.
520+
New elements can be published via the materialized queue:
521+
522+
```scala
523+
import akka.stream.scaladsl.Source
524+
import com.weightwatchers.reactive.kinesis.models._
525+
import com.weightwatchers.reactive.kinesis.stream._
526+
527+
val sourceQueue = Source.queue[ProducerEvent](1000, OverflowStrategy.fail)
528+
.toMat(Kinesis.sink("producer-name"))(Keep.left)
529+
.run()
530+
531+
sourceQueue.offer(ProducerEvent("foo", "bar"))
532+
sourceQueue.offer(ProducerEvent("foo", "baz"))
533+
```
534+
535+
The `Sink` uses a `KinesisProducerActor` under the cover. All rules regarding this actor also apply for the `Sink`.
536+
487537

488538
<a name="running-the-reliability-test"></a>
489539
# Running the reliability test

src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ trait KinesisConfiguration {
4343
| testProducer {
4444
| stream-name = "$streamName"
4545
|
46+
| akka.max-outstanding-requests = 10
47+
|
4648
| kpl {
4749
| Region = us-east-1
4850
|
@@ -199,7 +201,9 @@ trait KinesisSuite
199201
Some(100.millis))
200202

201203
def consumerConf(batchSize: Long = TestStreamNrOfMessagesPerShard): ConsumerConf = {
202-
consumerConfFor(streamName = TestStreamName, appName = appName, maxRecords = batchSize.toInt)
204+
consumerConfFor(streamName = TestStreamName,
205+
appName = appName,
206+
maxRecords = math.max(1, batchSize.toInt))
203207
}
204208

205209
def producerConf(): ProducerConf = producerConfFor(TestStreamName, appName)
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.weightwatchers.reactive.kinesis.stream
2+
3+
import akka.stream.scaladsl.Source
4+
import com.weightwatchers.reactive.kinesis.common.{
5+
AkkaUnitTestLike,
6+
KinesisConfiguration,
7+
KinesisSuite
8+
}
9+
import com.weightwatchers.reactive.kinesis.models.ProducerEvent
10+
import org.scalatest.{FreeSpec, Matchers}
11+
12+
import scala.concurrent.duration._
13+
14+
class KinesisSinkGraphStageIntegrationSpec
15+
extends FreeSpec
16+
with KinesisSuite
17+
with KinesisConfiguration
18+
with AkkaUnitTestLike
19+
with Matchers {
20+
21+
"KinesisSinkGraph" - {
22+
23+
"produced messages are written to the stream" in new withKinesisConfForApp("sink_produce") {
24+
val messageCount = 100
25+
val elements = 1.to(messageCount).map(_.toString)
26+
Source(elements)
27+
.map(num => ProducerEvent(num, num))
28+
.runWith(Kinesis.sink(producerConf()))
29+
.futureValue
30+
val list = testConsumer.retrieveRecords(TestStreamName, messageCount)
31+
list should contain allElementsOf elements
32+
testConsumer.shutdown()
33+
}
34+
35+
"upstream fail should fail the materialized value of the sink" in new withKinesisConfForApp(
36+
"sink_fail"
37+
) {
38+
Source
39+
.failed(new IllegalStateException("Boom"))
40+
.runWith(Kinesis.sink(producerConf()))
41+
.failed
42+
.futureValue shouldBe a[IllegalStateException]
43+
}
44+
}
45+
46+
// do not create messages in setup, we will create messages inside the test
47+
override def TestStreamNrOfMessagesPerShard: Long = 0
48+
override implicit def patienceConfig: PatienceConfig = PatienceConfig(60.seconds, 1.second)
49+
}

src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphIntegrationSpec.scala renamed to src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStageIntegrationSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import org.scalatest._
1010

1111
import scala.concurrent.duration._
1212

13-
class KinesisSourceGraphIntegrationSpec
13+
class KinesisSourceGraphStageIntegrationSpec
1414
extends FreeSpec
1515
with KinesisSuite
1616
with KinesisConfiguration

src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala

Lines changed: 130 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,22 @@
1616

1717
package com.weightwatchers.reactive.kinesis.stream
1818

19-
import akka.NotUsed
20-
import akka.actor.ActorSystem
21-
import akka.stream.scaladsl.Source
19+
import akka.{Done, NotUsed}
20+
import akka.actor.{ActorSystem, Props}
21+
import akka.stream.scaladsl.{Sink, Source}
22+
import com.amazonaws.auth.AWSCredentialsProvider
23+
import com.typesafe.config.Config
24+
import com.typesafe.scalalogging.LazyLogging
2225
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
23-
import com.weightwatchers.reactive.kinesis.models.ConsumerEvent
26+
import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent}
27+
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf}
28+
29+
import scala.concurrent.Future
2430

2531
/**
2632
* Main entry point for creating a Kinesis source and sink.
2733
*/
28-
object Kinesis {
34+
object Kinesis extends LazyLogging {
2935

3036
/**
3137
* Create a source, that provides KinesisEvents.
@@ -39,20 +45,20 @@ object Kinesis {
3945
def source(
4046
consumerConf: ConsumerConf
4147
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
42-
Source.fromGraph(new KinesisSourceGraph(consumerConf, system))
48+
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, system))
4349
}
4450

4551
/**
4652
* Create a source by using the actor system configuration, that provides KinesisEvents.
4753
* Please note: every KinesisEvent has to be committed during the user flow!
4854
* Uncommitted events will be retransmitted after a timeout.
4955
*
50-
* The application conf file should look like this:
56+
* A minimal application conf file should look like this:
5157
* {{{
5258
* kinesis {
5359
* application-name = "SampleService"
5460
* consumer-name {
55-
* stream-name = "sample-consumer"
61+
* stream-name = "sample-stream"
5662
* }
5763
* }
5864
* }}}
@@ -68,4 +74,120 @@ object Kinesis {
6874
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
6975
source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName))
7076
}
77+
78+
/**
79+
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
80+
*
81+
* The sink itself sends all events to an actor, which is created with the given Props.
82+
* Every message send needs to be acknowledged by the underlying producer actor.
83+
*
84+
* This sink signals back pressure, if more than maxOutstanding messages are not acknowledged.
85+
*
86+
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
87+
* The future fails, if the sending an event fails or upstream has failed the stream.
88+
*
89+
* @param props the props to create a producer actor. This is a function to work around #48.
90+
* @param maxOutStanding the number of messages to send to the actor unacknowledged before back pressure is applied.
91+
* @param system the actor system.
92+
* @return A sink that accepts ProducerEvents.
93+
*/
94+
def sink(props: => Props, maxOutStanding: Int)(
95+
implicit system: ActorSystem
96+
): Sink[ProducerEvent, Future[Done]] = {
97+
Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system))
98+
}
99+
100+
/**
101+
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
102+
*
103+
* The sink itself sends all events to an KinesisProducerActor which is configured with given config object.
104+
* Every message send needs to be acknowledged by the underlying producer actor.
105+
*
106+
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
107+
* If throttling is not configured, a default value (= 1000 messages) is applied.
108+
*
109+
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
110+
* The future fails, if the sending an event fails or upstream has failed the stream.
111+
*
112+
* @param producerConf the configuration to create KinesisProducerActor
113+
* @param system the actor system.
114+
* @return A sink that accepts ProducerEvents.
115+
*/
116+
def sink(
117+
producerConf: ProducerConf
118+
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
119+
val maxOutstanding = producerConf.throttlingConf.fold {
120+
logger.info(
121+
"Producer throttling not configured - set maxOutstanding to 1000. Configure with: kinesis.{producer}.akka.max-outstanding-requests=1000"
122+
)
123+
1000
124+
}(_.maxOutstandingRequests)
125+
sink(KinesisProducerActor.props(producerConf), maxOutstanding)
126+
}
127+
128+
/**
129+
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
130+
*
131+
* The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name.
132+
* Every message send needs to be acknowledged by the underlying producer actor.
133+
*
134+
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
135+
* If throttling is not configured, a default value (= 1000 messages) is applied.
136+
*
137+
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
138+
* The future fails, if the sending an event fails or upstream has failed the stream.
139+
*
140+
* @param kinesisConfig the configuration object that holds the producer config.
141+
* @param producerName the name of the producer in the system configuration.
142+
* @param credentialsProvider the AWS credentials provider to use to connect.
143+
* @param system the actor system.
144+
* @return A sink that accepts ProducerEvents.
145+
*/
146+
def sink(kinesisConfig: Config,
147+
producerName: String,
148+
credentialsProvider: Option[AWSCredentialsProvider])(
149+
implicit system: ActorSystem
150+
): Sink[ProducerEvent, Future[Done]] = {
151+
sink(
152+
ProducerConf(kinesisConfig, producerName, credentialsProvider)
153+
)
154+
}
155+
156+
/**
157+
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
158+
*
159+
* The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name.
160+
* Every message send needs to be acknowledged by the underlying producer actor.
161+
*
162+
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
163+
* If throttling is not configured, a default value (= 1000 messages) is applied.
164+
*
165+
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
166+
* The future fails, if the sending an event fails or upstream has failed the stream.
167+
*
168+
* A minimal application conf file should look like this:
169+
* {{{
170+
* kinesis {
171+
* application-name = "SampleService"
172+
* producer-name {
173+
* stream-name = "sample-stream"
174+
* akka.max-outstanding-requests = 100
175+
* }
176+
* }
177+
* }}}
178+
* See kinesis reference.conf for a list of all available config options.
179+
*
180+
* @param producerName the name of the producer in the system configuration.
181+
* @param inConfig the configuration object that holds the producer config (usually kinesis).
182+
* @param credentialsProvider the AWS credentials provider to use to connect.
183+
* @param system the actor system.
184+
* @return A sink that accepts ProducerEvents.
185+
*/
186+
def sink(producerName: String,
187+
inConfig: String = "kinesis",
188+
credentialsProvider: Option[AWSCredentialsProvider] = None)(
189+
implicit system: ActorSystem
190+
): Sink[ProducerEvent, Future[Done]] = {
191+
sink(system.settings.config.getConfig(inConfig), producerName, credentialsProvider)
192+
}
71193
}

0 commit comments

Comments
 (0)