Skip to content

Commit 2299b74

Browse files
committed
Support newly joined consumers per issue#17
1 parent 40473a5 commit 2299b74

File tree

5 files changed

+19
-15
lines changed

5 files changed

+19
-15
lines changed

src/main/kotlin/com/omarsmak/kafka/consumer/lag/monitoring/cli/ClientCli.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,9 @@ class ClientCli : Callable<Void> {
7777
return null
7878
}
7979

80-
private fun initializeConsumerGroups(): Set<String> {
81-
val configConsumerGroups = kafkaConsumerClients.split(",")
82-
return Utils.getTargetConsumerGroups(kafkaConsumerLagClient, configConsumerGroups)
83-
}
80+
private fun initializeConsumerGroups(): List<String> = kafkaConsumerClients.split(",")
8481

85-
private fun initializeConfigurations(targetConsumerGroups: Set<String>) = KafkaConsumerLagClientConfig.create(mapOf(
82+
private fun initializeConfigurations(targetConsumerGroups: List<String>) = KafkaConsumerLagClientConfig.create(mapOf(
8683
KafkaConsumerLagClientConfig.BOOTSTRAP_SERVERS to kafkaBootstrapServers,
8784
KafkaConsumerLagClientConfig.HTTP_PORT to httpPort,
8885
KafkaConsumerLagClientConfig.POLL_INTERVAL to pollInterval,

src/main/kotlin/com/omarsmak/kafka/consumer/lag/monitoring/config/KafkaConsumerLagClientConfig.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ open class KafkaConsumerLagClientConfig private constructor(
2828
private val httpPort by optional(name = HTTP_PORT, default = DEFAULT_HTTP_PORT)
2929
private val bootstrapServers by required<String>(name = BOOTSTRAP_SERVERS)
3030
private val pollInterval by optional(name = POLL_INTERVAL, default = DEFAULT_POLL_INTERVAL)
31-
private val consumerGroups by required<Set<String>>(name = CONSUMER_GROUPS)
31+
private val consumerGroups by required<List<String>>(name = CONSUMER_GROUPS)
3232
private val lagTreshold by optional(name = LAG_THRESHOLD, default = DEFAULT_LAG_THRESHOLD)
3333

3434
/**

src/main/kotlin/com/omarsmak/kafka/consumer/lag/monitoring/response/ConsoleResponseView.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package com.omarsmak.kafka.consumer.lag.monitoring.response
44

55
import com.github.ajalt.mordant.TermColors
6+
import com.omarsmak.kafka.consumer.lag.monitoring.cli.Utils
67
import com.omarsmak.kafka.consumer.lag.monitoring.client.KafkaConsumerLagClient
78
import com.omarsmak.kafka.consumer.lag.monitoring.client.data.Lag
89
import com.omarsmak.kafka.consumer.lag.monitoring.client.exceptions.KafkaConsumerLagClientException
@@ -24,7 +25,7 @@ class ConsoleResponseView : ResponseView {
2425
}
2526

2627
override fun execute() {
27-
val targetConsumerGroups: Set<String> = kafkaConsumerLagClientConfig[KafkaConsumerLagClientConfig.CONSUMER_GROUPS]
28+
val targetConsumerGroups: List<String> = kafkaConsumerLagClientConfig[KafkaConsumerLagClientConfig.CONSUMER_GROUPS]
2829
val monitoringPollInterval: Long = kafkaConsumerLagClientConfig[KafkaConsumerLagClientConfig.POLL_INTERVAL]
2930
val monitoringLagThreshold: Int = kafkaConsumerLagClientConfig[KafkaConsumerLagClientConfig.LAG_THRESHOLD]
3031

@@ -40,11 +41,13 @@ class ConsoleResponseView : ResponseView {
4041
* `Total consumer offsets: @totalConsumerOffsets`
4142
* `Total lag: @totalLag`
4243
*/
43-
private fun show(targetConsumerGroups: Set<String>, monitoringPollInterval: Long, monitoringLagThreshold: Int) {
44+
private fun show(initialConsumerGroups: List<String>, monitoringPollInterval: Long, monitoringLagThreshold: Int) {
4445
Timer().scheduleAtFixedRate(0, monitoringPollInterval) {
4546
print("\u001b[H\u001b[2J")
4647

47-
targetConsumerGroups.forEach { consumer ->
48+
val consumers = Utils.getTargetConsumerGroups(kafkaConsumerLagClient, initialConsumerGroups)
49+
50+
consumers.forEach { consumer ->
4851
try {
4952
val metrics = kafkaConsumerLagClient.getConsumerLag(consumer)
5053
println("Consumer group: $consumer")

src/main/kotlin/com/omarsmak/kafka/consumer/lag/monitoring/response/PrometheusResponseView.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
package com.omarsmak.kafka.consumer.lag.monitoring.response
44

5+
import com.omarsmak.kafka.consumer.lag.monitoring.cli.Utils
56
import com.omarsmak.kafka.consumer.lag.monitoring.client.KafkaConsumerLagClient
67
import com.omarsmak.kafka.consumer.lag.monitoring.client.exceptions.KafkaConsumerLagClientException
78
import com.omarsmak.kafka.consumer.lag.monitoring.config.KafkaConsumerLagClientConfig
@@ -59,7 +60,7 @@ class PrometheusResponseView : ResponseView {
5960
}
6061

6162
override fun execute() {
62-
val targetConsumerGroups: Set<String> = kafkaConsumerLagClientConfig[KafkaConsumerLagClientConfig.CONSUMER_GROUPS]
63+
val targetConsumerGroups: List<String> = kafkaConsumerLagClientConfig[KafkaConsumerLagClientConfig.CONSUMER_GROUPS]
6364
val monitoringPollInterval: Long = kafkaConsumerLagClientConfig[KafkaConsumerLagClientConfig.POLL_INTERVAL]
6465
val httpPort: Int = kafkaConsumerLagClientConfig[KafkaConsumerLagClientConfig.HTTP_PORT]
6566

@@ -75,15 +76,18 @@ class PrometheusResponseView : ResponseView {
7576
* `kafka_consumer_group_total_lag{group, topic}`
7677
* `kafka_topic_latest_offsets{group, topic, partition}
7778
*/
78-
private fun initialize(client: KafkaConsumerLagClient, targetConsumerGroups: Set<String>, port: Int, monitoringPollInterval: Long) {
79+
private fun initialize(client: KafkaConsumerLagClient, initialConsumerGroups: List<String>, port: Int, monitoringPollInterval: Long) {
7980
// Start HTTP our server
8081
startServer(port)
8182

8283
logger.info("Updating metrics every $monitoringPollInterval...")
8384

8485
// Start publishing our metrics
8586
Timer().scheduleAtFixedRate(0, monitoringPollInterval) {
86-
targetConsumerGroups.forEach { consumer ->
87+
88+
val consumers = Utils.getTargetConsumerGroups(kafkaConsumerLagClient, initialConsumerGroups)
89+
90+
consumers.forEach { consumer ->
8791
try {
8892
val lag = client.getConsumerLag(consumer)
8993

src/test/kotlin/com/omarsmak/kafka/consumer/lag/monitoring/config/KafkaConsumerLagClientConfigTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal class KafkaConsumerLagClientConfigTest {
1717
))
1818

1919
assertEquals("kafka1:9092,kafka2:9092", config[KafkaConsumerLagClientConfig.BOOTSTRAP_SERVERS])
20-
assertEquals(setOf("consumer_1", "consumer_2"), config[KafkaConsumerLagClientConfig.CONSUMER_GROUPS])
20+
assertEquals(listOf("consumer_1", "consumer_2"), config[KafkaConsumerLagClientConfig.CONSUMER_GROUPS])
2121
assertEquals(9000, config[KafkaConsumerLagClientConfig.HTTP_PORT])
2222
assertEquals(2000, config[KafkaConsumerLagClientConfig.POLL_INTERVAL])
2323
assertEquals(500, config[KafkaConsumerLagClientConfig.LAG_THRESHOLD])
@@ -38,7 +38,7 @@ internal class KafkaConsumerLagClientConfigTest {
3838
))
3939

4040
assertEquals("kafka1:9092,kafka2:9092", config[KafkaConsumerLagClientConfig.BOOTSTRAP_SERVERS])
41-
assertEquals(setOf("consumer_1", "consumer_2"), config[KafkaConsumerLagClientConfig.CONSUMER_GROUPS])
41+
assertEquals(listOf("consumer_1", "consumer_2"), config[KafkaConsumerLagClientConfig.CONSUMER_GROUPS])
4242
assertEquals(3000, config[KafkaConsumerLagClientConfig.HTTP_PORT])
4343
assertEquals(100, config[KafkaConsumerLagClientConfig.POLL_INTERVAL])
4444
}
@@ -84,7 +84,7 @@ internal class KafkaConsumerLagClientConfigTest {
8484
})
8585

8686
assertEquals("kafka1:9092,kafka2:9092", config[KafkaConsumerLagClientConfig.BOOTSTRAP_SERVERS])
87-
assertEquals(setOf("consumer_1", "consumer_2"), config[KafkaConsumerLagClientConfig.CONSUMER_GROUPS])
87+
assertEquals(listOf("consumer_1", "consumer_2"), config[KafkaConsumerLagClientConfig.CONSUMER_GROUPS])
8888
}
8989

9090
@Test

0 commit comments

Comments
 (0)