Skip to content
This repository was archived by the owner on Oct 23, 2023. It is now read-only.

Commit 3e9bb6c

Browse files
etspacemanmarkglh
authored andcommitted
KPL Typed Config (#65)
* Adding typed config for the KPL
1 parent 3bd04e5 commit 3e9bb6c

File tree

5 files changed

+397
-4
lines changed

5 files changed

+397
-4
lines changed

.travis.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
sudo: false
1+
sudo: required
22
language: scala
3+
services:
4+
- docker
35
scala:
46
- 2.11.11
57
- 2.12.5

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
1414
* [Defining a config file in the client application](#usage-defining-a-config-file-in-the-client-application)
1515
* [Notable Consumer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-consumer-configuration-values)
1616
* [Notable Producer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-producer-configuration-values)
17+
* [Typed Configuration - Producer](#typed-configuration---producer)
1718
* [Usage: Consumer](#usage-usage-consumer)
1819
* [Actor Based Consumer](#actor-based-consumer)
1920
* [Important considerations when implementing the Event Processor](#usage-usage-consumer-important-considerations-when-implementing-the-event-processor)
@@ -205,6 +206,37 @@ which allows us to track the progress of sent messages when they go with the nex
205206
* `kinesis.<producer-name>.kpl.RateLimit` - Limits the maximum allowed put rate for a shard, as a percentage of the backend limits.
206207

207208
<a name="usage-usage-consumer"></a>
209+
210+
### Typed Configuration - Producer
211+
If you don't want to depend on config files, there's a typed configuration class available: [KinesisProducerConfig](src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerConfig.scala)
212+
213+
You can construct it in a few ways:
214+
215+
```scala
216+
// With default values
217+
val defaultProducerConfig = KinesisProducerConfig()
218+
219+
// With a provided KinesisProducerConfiguration from the Java KPL library
220+
val awsKinesisConfig: KinesisProducerConfiguration = ...
221+
val producerConfig = KinesisProducerConfig(awsKinesisConfig)
222+
223+
// With a typesafe-config object
224+
val typesafeConfig: Config = ...
225+
val producerConfig = KinesisProducerConfig(typesafeConfig)
226+
227+
// With a typesafe-config object and an AWSCredentialsProvider
228+
val typesafeConfig: Config = ...
229+
val credentialsProvider: AWSCredentialsProvider = ...
230+
val producerConfig = KinesisProducerConfig(typesafeConfig, credentialsProvider)
231+
```
232+
233+
These can be used to create a ProducerConf and ultimately a KinesisProducer, like so:
234+
235+
```scala
236+
val producerConfig: KinesisProducerConfig = ...
237+
val producerConf: ProducerConf = ProducerConf(producerConfig, "my-stream-name", None, None)
238+
```
239+
208240
## Usage: Consumer
209241
`reactive-kinesis` provides two different ways to consume messages from Kinesis: [Actor Based Consumer](#actor-based-consumer) and [Akka Stream Source](#akka-stream-source).
210242

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright 2017 WeightWatchers
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.weightwatchers.reactive.kinesis.producer
18+
19+
import com.amazonaws.auth.AWSCredentialsProvider
20+
import com.amazonaws.regions.Regions
21+
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration
22+
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel
23+
import com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension
24+
import com.typesafe.config.{Config, ConfigFactory}
25+
26+
/** Typed config class for the KPL */
27+
final case class KinesisProducerConfig(
28+
additionalMetricDimensions: List[AdditionalDimension],
29+
credentialsProvider: Option[AWSCredentialsProvider],
30+
metricsCredentialsProvider: Option[AWSCredentialsProvider],
31+
aggregationEnabled: Boolean,
32+
aggregationMaxCount: Long,
33+
aggregationMaxSize: Long,
34+
cloudwatchEndpoint: Option[String],
35+
cloudwatchPort: Long,
36+
collectionMaxCount: Long,
37+
collectionMaxSize: Long,
38+
connectTimeout: Long,
39+
credentialsRefreshDelay: Long,
40+
enableCoreDumps: Boolean,
41+
failIfThrottled: Boolean,
42+
kinesisEndpoint: Option[String],
43+
kinesisPort: Long,
44+
logLevel: String,
45+
maxConnections: Long,
46+
metricsGranularity: String,
47+
metricsLevel: String,
48+
metricsNamespace: String,
49+
metricsUploadDelay: Long,
50+
minConnections: Long,
51+
nativeExecutable: Option[String],
52+
rateLimit: Long,
53+
recordMaxBufferedTime: Long,
54+
recordTtl: Long,
55+
region: Option[Regions],
56+
requestTimeout: Long,
57+
tempDirectory: Option[String],
58+
verifyCertificate: Boolean,
59+
threadingModel: ThreadingModel,
60+
threadPoolSize: Int
61+
) {
62+
63+
def toAwsConfig: KinesisProducerConfiguration = {
64+
val initial = new KinesisProducerConfiguration()
65+
.setAggregationEnabled(aggregationEnabled)
66+
.setAggregationMaxCount(aggregationMaxCount)
67+
.setAggregationMaxSize(aggregationMaxSize)
68+
.setCloudwatchPort(cloudwatchPort)
69+
.setCollectionMaxCount(collectionMaxCount)
70+
.setCollectionMaxSize(collectionMaxSize)
71+
.setConnectTimeout(connectTimeout)
72+
.setCredentialsRefreshDelay(credentialsRefreshDelay)
73+
.setEnableCoreDumps(enableCoreDumps)
74+
.setFailIfThrottled(failIfThrottled)
75+
.setKinesisPort(kinesisPort)
76+
.setLogLevel(logLevel)
77+
.setMaxConnections(maxConnections)
78+
.setMetricsGranularity(metricsGranularity)
79+
.setMetricsLevel(metricsLevel)
80+
.setMetricsNamespace(metricsNamespace)
81+
.setMetricsUploadDelay(metricsUploadDelay)
82+
.setMinConnections(minConnections)
83+
.setRateLimit(rateLimit)
84+
.setRecordMaxBufferedTime(recordMaxBufferedTime)
85+
.setRecordTtl(recordTtl)
86+
.setRequestTimeout(requestTimeout)
87+
.setVerifyCertificate(verifyCertificate)
88+
.setThreadingModel(threadingModel)
89+
.setThreadPoolSize(threadPoolSize)
90+
91+
KinesisProducerConfig.setAdditionalDimensions(initial, additionalMetricDimensions)
92+
93+
// This is ugly
94+
val wCredProv = credentialsProvider.fold(initial)(initial.setCredentialsProvider)
95+
val wMetricCredProv =
96+
metricsCredentialsProvider.fold(wCredProv)(wCredProv.setMetricsCredentialsProvider)
97+
val wCWEP = cloudwatchEndpoint.fold(wMetricCredProv)(wMetricCredProv.setCloudwatchEndpoint)
98+
val wKinesisEP = kinesisEndpoint.fold(wCWEP)(wCWEP.setKinesisEndpoint)
99+
val wNativeExec = nativeExecutable.fold(wKinesisEP)(wKinesisEP.setNativeExecutable)
100+
val wRegion = region.fold(wNativeExec)(reg => wNativeExec.setRegion(reg.getName))
101+
val wTempDir = tempDirectory.fold(wRegion)(wRegion.setTempDirectory)
102+
103+
wTempDir
104+
}
105+
}
106+
107+
object KinesisProducerConfig {
108+
def apply(): KinesisProducerConfig = default
109+
def apply(config: KinesisProducerConfiguration): KinesisProducerConfig = fromAwsConfig(config)
110+
def apply(config: Config): KinesisProducerConfig =
111+
fromAwsConfig(ProducerConf.buildKPLConfig(config, None))
112+
def apply(config: Config, credentialsProvider: AWSCredentialsProvider): KinesisProducerConfig =
113+
fromAwsConfig(ProducerConf.buildKPLConfig(config, Some(credentialsProvider)))
114+
115+
// Sets default values as if no typesafe configuration was passed. This ensures that the default
116+
// KinesisProducerConfiguration is used
117+
def default: KinesisProducerConfig = {
118+
val defaultConfig = ProducerConf.buildKPLConfig(ConfigFactory.empty(), None)
119+
fromAwsConfig(defaultConfig)
120+
}
121+
122+
private def fromAwsConfig(config: KinesisProducerConfiguration): KinesisProducerConfig =
123+
KinesisProducerConfig(
124+
additionalMetricDimensions = List(), // No way to retrieve this from a KinesisProducerConfiguration
125+
credentialsProvider = Some(config.getCredentialsProvider),
126+
metricsCredentialsProvider = Some(config.getMetricsCredentialsProvider),
127+
aggregationEnabled = config.isAggregationEnabled,
128+
aggregationMaxCount = config.getAggregationMaxCount,
129+
aggregationMaxSize = config.getAggregationMaxSize,
130+
cloudwatchEndpoint = Some(config.getCloudwatchEndpoint),
131+
cloudwatchPort = config.getCloudwatchPort,
132+
collectionMaxCount = config.getCollectionMaxCount,
133+
collectionMaxSize = config.getCollectionMaxSize,
134+
connectTimeout = config.getConnectTimeout,
135+
credentialsRefreshDelay = config.getCredentialsRefreshDelay,
136+
enableCoreDumps = config.isEnableCoreDumps,
137+
failIfThrottled = config.isFailIfThrottled,
138+
kinesisEndpoint = Some(config.getKinesisEndpoint),
139+
kinesisPort = config.getKinesisPort,
140+
logLevel = config.getLogLevel,
141+
maxConnections = config.getMaxConnections,
142+
metricsGranularity = config.getMetricsGranularity,
143+
metricsLevel = config.getMetricsLevel,
144+
metricsNamespace = config.getMetricsNamespace,
145+
metricsUploadDelay = config.getMetricsUploadDelay,
146+
minConnections = config.getMinConnections,
147+
nativeExecutable = Some(config.getNativeExecutable),
148+
rateLimit = config.getRateLimit,
149+
recordMaxBufferedTime = config.getRecordMaxBufferedTime,
150+
recordTtl = config.getRecordTtl,
151+
region = config.getRegion match {
152+
case x if x.isEmpty => None
153+
case x => Some(Regions.fromName(x))
154+
},
155+
requestTimeout = config.getRequestTimeout,
156+
tempDirectory = Some(config.getTempDirectory),
157+
verifyCertificate = config.isVerifyCertificate,
158+
threadingModel = config.getThreadingModel,
159+
threadPoolSize = config.getThreadPoolSize
160+
)
161+
162+
private def setAdditionalDimensions(
163+
conf: KinesisProducerConfiguration,
164+
dimensions: List[AdditionalDimension]
165+
) = dimensions.foldLeft(conf) { (conf, dimension) =>
166+
conf.addAdditionalMetricsDimension(dimension.getKey,
167+
dimension.getValue,
168+
dimension.getGranularity)
169+
conf
170+
}
171+
}

src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,24 @@ object ProducerConf {
8585
parseThrottlingConfig(producerConfig))
8686
}
8787

88-
private def buildKPLConfig(kplConfig: Config,
89-
credentialsProvider: Option[AWSCredentialsProvider]) = {
88+
/**
89+
* Simple typed apply method
90+
*
91+
* @param kinesisConfig The top level Kinesis Configuration, containing the specified producer
92+
* @param streamName The name of the Kinesis stream to consume
93+
* @param dispatcher Optional config path for the akka dispatcher
94+
* @param throttlingConf Optional configuration which defines whether and how often to throttle
95+
* @return A [[ProducerConf]] case class used for constructing the [[KinesisProducerActor]]
96+
*/
97+
def apply(kinesisConfig: KinesisProducerConfig,
98+
streamName: String,
99+
dispatcher: Option[String],
100+
throttlingConf: Option[ThrottlingConf]): ProducerConf = {
101+
102+
new ProducerConf(streamName, kinesisConfig.toAwsConfig, dispatcher, throttlingConf)
103+
}
104+
105+
def buildKPLConfig(kplConfig: Config, credentialsProvider: Option[AWSCredentialsProvider]) = {
90106
// We directly load our properties into the KPL as a Java `Properties` object
91107
// See http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html
92108
import TypesafeConfigExtensions._

0 commit comments

Comments
 (0)