Skip to content
This repository was archived by the owner on Oct 23, 2023. It is now read-only.

Commit 15890a5

Browse files
htimurmarkglh
authored andcommitted
Add additional factory methods to Kinesis to create Akka Source (#61)
* Add additional factory methods to Kinesis object, to create Kinesis stream source with explicitly specifying ConsumerService * Updated the README and changed the parameter order of the Kinesis.source * Fixed the formatting issues and the compiler error due to the multiple overloaded methods with default params * Removed ambiguous constructor from KinesisSourceGraphStage, as the functionality delegated to the Kinesis object * Fixed formatting issues
1 parent c635b6f commit 15890a5

File tree

3 files changed

+116
-25
lines changed

3 files changed

+116
-25
lines changed

README.md

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ An Akka `Source` is provided that can be used with streams. It is possible to cr
311311
directly from the consumer name that is defined in the configuration.
312312
Every message that is emitted to the stream is of type `CommitableEvent[ConsumerEvent]` and has to be committed
313313
explicitly downstream with a call to `event.commit()`. It is possible to map to a different type of `CommittableEvent`
314-
via the `map` and `mapAsync` functionality.
314+
via the `map` and `mapAsync` functionality. A `KinesisConsumer` is created internally for the `Kinesis.source`, when the factory method isn't defined.
315315

316316
```scala
317317
import com.weightwatchers.reactive.kinesis.stream._
@@ -325,7 +325,29 @@ Kinesis
325325
.runWith(Sink.seq)
326326
```
327327

328-
A `KinesisConsumer` is used internally for the `Kinesis.source`. All rules described here for the `KinesisConsumer` also apply for the stream source.
328+
Or you can explicitly path a lambda, to create the `KinesisConsumer`.
329+
330+
```scala
331+
import akka.actor.{ActorRef, ActorSystem}
332+
import akka.stream.scaladsl.Sink
333+
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
334+
import com.weightwatchers.reactive.kinesis.stream._
335+
336+
val sys = ActorSystem("kinesis-consumer-system")
337+
338+
Kinesis
339+
.source(
340+
"consumer-name",
341+
(conf: KinesisConsumer.ConsumerConf, eventProcessor: ActorRef) => KinesisConsumer(conf, eventProcessor, sys)
342+
)
343+
.take(100)
344+
.map(event => event.map(_.payloadAsString())) // read and map payload as string
345+
.mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload))) // handle an async message
346+
.map(event => event.commit()) // mark the event as handled by calling commit
347+
.runWith(Sink.seq)
348+
```
349+
350+
All rules described here for the `KinesisConsumer` also apply for the stream source.
329351

330352
<a name="usage-usage-consumer-graceful-shutdown"></a>
331353
### Graceful Shutdown

src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala

Lines changed: 92 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
package com.weightwatchers.reactive.kinesis.stream
1818

1919
import akka.{Done, NotUsed}
20-
import akka.actor.{ActorSystem, Props}
20+
import akka.actor.{ActorRef, ActorSystem, Props}
2121
import akka.stream.scaladsl.{Sink, Source}
2222
import com.amazonaws.auth.AWSCredentialsProvider
2323
import com.typesafe.config.Config
2424
import com.typesafe.scalalogging.LazyLogging
25+
import com.weightwatchers.reactive.kinesis.consumer.{ConsumerService, KinesisConsumer}
2526
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
2627
import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent}
2728
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf}
@@ -33,6 +34,23 @@ import scala.concurrent.Future
3334
*/
3435
object Kinesis extends LazyLogging {
3536

37+
/**
38+
* Create a source, that provides KinesisEvents.
39+
* Please note: every KinesisEvent has to be committed during the user flow!
40+
* Uncommitted events will be retransmitted after a timeout.
41+
*
42+
* @param consumerConf the configuration to connect to Kinesis.
43+
* @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef.
44+
* @param system the actor system.
45+
* @return A source of KinesisEvent objects.
46+
*/
47+
def source(
48+
consumerConf: ConsumerConf,
49+
createConsumer: ActorRef => ConsumerService
50+
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
51+
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, createConsumer, system))
52+
}
53+
3654
/**
3755
* Create a source, that provides KinesisEvents.
3856
* Please note: every KinesisEvent has to be committed during the user flow!
@@ -45,7 +63,7 @@ object Kinesis extends LazyLogging {
4563
def source(
4664
consumerConf: ConsumerConf
4765
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
48-
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, system))
66+
source(consumerConf, KinesisConsumer(consumerConf, _, system))
4967
}
5068

5169
/**
@@ -69,12 +87,69 @@ object Kinesis extends LazyLogging {
6987
* @param system the actor system to use.
7088
* @return A source of KinesisEvent objects.
7189
*/
72-
def source(consumerName: String, inConfig: String = "kinesis")(
90+
def source(consumerName: String, inConfig: String)(
7391
implicit system: ActorSystem
7492
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
7593
source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName))
7694
}
7795

96+
/**
97+
* Create a source by using the actor system configuration, that provides KinesisEvents.
98+
* Please note: every KinesisEvent has to be committed during the user flow!
99+
* Uncommitted events will be retransmitted after a timeout.
100+
*
101+
* A minimal application conf file should look like this:
102+
* {{{
103+
* kinesis {
104+
* application-name = "SampleService"
105+
* consumer-name {
106+
* stream-name = "sample-stream"
107+
* }
108+
* }
109+
* }}}
110+
* See kinesis reference.conf for a list of all available config options.
111+
*
112+
* @param consumerName the name of the consumer in the application.conf.
113+
* @param system the actor system to use.
114+
* @return A source of KinesisEvent objects.
115+
*/
116+
def source(consumerName: String)(
117+
implicit system: ActorSystem
118+
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
119+
source(consumerName, "kinesis")
120+
}
121+
122+
/**
123+
* Create a source by using the actor system configuration, that provides KinesisEvents.
124+
* Please note: every KinesisEvent has to be committed during the user flow!
125+
* Uncommitted events will be retransmitted after a timeout.
126+
*
127+
* A minimal application conf file should look like this:
128+
* {{{
129+
* kinesis {
130+
* application-name = "SampleService"
131+
* consumer-name {
132+
* stream-name = "sample-stream"
133+
* }
134+
* }
135+
* }}}
136+
* See kinesis reference.conf for a list of all available config options.
137+
*
138+
* @param consumerName the name of the consumer in the application.conf.
139+
* @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef.
140+
* @param inConfig the name of the sub-config for kinesis.
141+
* @param system the actor system to use.
142+
* @return A source of KinesisEvent objects.
143+
*/
144+
def source(
145+
consumerName: String,
146+
createConsumer: (ConsumerConf, ActorRef) => ConsumerService,
147+
inConfig: String = "kinesis"
148+
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
149+
val consumerConf = ConsumerConf(system.settings.config.getConfig(inConfig), consumerName)
150+
source(consumerConf, createConsumer(consumerConf, _))
151+
}
152+
78153
/**
79154
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
80155
*
@@ -91,9 +166,10 @@ object Kinesis extends LazyLogging {
91166
* @param system the actor system.
92167
* @return A sink that accepts ProducerEvents.
93168
*/
94-
def sink(props: => Props, maxOutStanding: Int)(
95-
implicit system: ActorSystem
96-
): Sink[ProducerEvent, Future[Done]] = {
169+
def sink(
170+
props: => Props,
171+
maxOutStanding: Int
172+
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
97173
Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system))
98174
}
99175

@@ -143,11 +219,11 @@ object Kinesis extends LazyLogging {
143219
* @param system the actor system.
144220
* @return A sink that accepts ProducerEvents.
145221
*/
146-
def sink(kinesisConfig: Config,
147-
producerName: String,
148-
credentialsProvider: Option[AWSCredentialsProvider])(
149-
implicit system: ActorSystem
150-
): Sink[ProducerEvent, Future[Done]] = {
222+
def sink(
223+
kinesisConfig: Config,
224+
producerName: String,
225+
credentialsProvider: Option[AWSCredentialsProvider]
226+
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
151227
sink(
152228
ProducerConf(kinesisConfig, producerName, credentialsProvider)
153229
)
@@ -183,11 +259,11 @@ object Kinesis extends LazyLogging {
183259
* @param system the actor system.
184260
* @return A sink that accepts ProducerEvents.
185261
*/
186-
def sink(producerName: String,
187-
inConfig: String = "kinesis",
188-
credentialsProvider: Option[AWSCredentialsProvider] = None)(
189-
implicit system: ActorSystem
190-
): Sink[ProducerEvent, Future[Done]] = {
262+
def sink(
263+
producerName: String,
264+
inConfig: String = "kinesis",
265+
credentialsProvider: Option[AWSCredentialsProvider] = None
266+
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
191267
sink(system.settings.config.getConfig(inConfig), producerName, credentialsProvider)
192268
}
193269
}

src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStage.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,6 @@ class KinesisSourceGraphStage(config: ConsumerConf,
143143
extends GraphStage[SourceShape[CommittableEvent[ConsumerEvent]]]
144144
with LazyLogging {
145145

146-
/**
147-
* Ctor that uses the KinesisConsumer as ConsumerService implementation.
148-
*/
149-
def this(config: ConsumerConf, actorSystem: ActorSystem) = {
150-
this(config, KinesisConsumer(config, _, actorSystem), actorSystem)
151-
}
152-
153146
private[this] val out: Outlet[CommittableEvent[ConsumerEvent]] = Outlet("KinesisSource.out")
154147
override val shape: SourceShape[CommittableEvent[ConsumerEvent]] = SourceShape.of(out)
155148

0 commit comments

Comments
 (0)