Skip to content

Commit d61f79d

Browse files
committed
Fix issue #20 and prepare for 0.0.8 release
1 parent 3f30cca commit d61f79d

File tree

2 files changed

+48
-14
lines changed

2 files changed

+48
-14
lines changed

README.md

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,25 @@ In summary, consumer lag tells us 2 things:
2323

2424
### Supported Kafka Versions
2525
Since this client uses Kafka Admin Client and Kafka Consumer client version of *2+*, therefore this client supportes Kafka brokders from version **0.10.2+**.
26+
27+
## Features
28+
* Rich metrics that show detailed consumer lags on both level, on the consumer group level and on the consumer member level for more granularity.
29+
* Metrics are available for both, console and Prometheus.
30+
* Ready to use thin Docker image for your cloud deployments such as Kubernetes.
31+
* The tool is also available as maven package in case you want to embedded it into your application.
32+
33+
## Changelog
34+
#### 0.0.8:
35+
- Issue #23: Extend Lag stats on consumer member level.
36+
- Issue #20: Support consumer group and topic deletion on the fly.
37+
- Issue #21: Change default port to 9739
38+
#### 0.0.7:
39+
- Issue #17: Now this client will show newly joined consumer groups as well **without the need to restart the client**. You should start it once and it will always refresh the consumer groups list according to the poll interval.
40+
- Kafka client updated to version 2.5.0.
41+
42+
#### 0.0.6:
43+
- Issue #8: Support configuration file as parameter
44+
- Kafka client updated to version 2.4.1.
2645

2746
## Installation and Usage
2847
#### Uber JAR
@@ -48,16 +67,6 @@ You can use placeholders in the arg command and fill these settings by environme
4867
args: ["-b", "$(BOOTSTRAP_SERVERS)","-m", "$(MODE)","-c", "$(CONSUMER_GROUPS)","-i", "$(POLL_INTERVAL)", "-p", "$(HTTP_PORT)"]
4968
```
5069

51-
## Changelog
52-
#### 0.0.7:
53-
- Issue #17: Now this client will show newly joined consumer groups as well **without the need to restart the client**. You should start it once and it will always refresh the consumer groups list according to the poll interval.
54-
- Kafka client updated to version 2.5.0.
55-
56-
#### 0.0.6:
57-
- Issue #8: Support configuration file as parameter
58-
- Kafka client updated to version 2.4.1.
59-
60-
6170
## Usage
6271
java -jar kafka-consumer-lag-monitoring.jar -h
6372
Usage: <main class> [-hV] [-b=<kafkaBootstrapServers>]
@@ -154,6 +163,12 @@ The latest committed offset of a topic in a given partition.
154163
##### `kafka_consumer_group_total_lag{group, topic}`
155164
The total lag of a consumer group behind the head of a topic. This gives the total lags from all partitions over each topic, it provides good visibility but not a precise measurement since is not partition aware.
156165

166+
##### `kafka_consumer_group_member_lag{group, member, topic}`
167+
The total lag of a consumer group member behind the head of a topic. This gives the total lags over each consumer member within consumer group.
168+
169+
##### `kafka_consumer_group_member_partition_lag{group, member, topic, partition}`
170+
The lag of a consumer member within consumer group behind the head of a given partition of a topic.
171+
157172
## Usage as Library
158173
If you want to use this client embedded into your application, you can achieve that by adding a [dependency](https://bintray.com/omarsmak/kafka/consumer-lag-monitoring) to this tool in your `pom.xml` or `gradle.build` as explained below:
159174
#### Maven
@@ -171,7 +186,7 @@ and under `<dependencies>..</dependencies>`:
171186
<dependency>
172187
<groupId>com.omarsmak.kafka</groupId>
173188
<artifactId>consumer-lag-monitoring</artifactId>
174-
<version>0.0.7</version>
189+
<version>0.0.8</version>
175190
</dependency>
176191
```
177192

@@ -184,7 +199,7 @@ repositories {
184199
```
185200
and under `dependencies` the following:
186201
```
187-
compile 'com.omarsmak.kafka:consumer-lag-monitoring:0.0.7'
202+
compile 'com.omarsmak.kafka:consumer-lag-monitoring:0.0.8'
188203
189204
```
190205
**Note:** Since [bintray jcenter](https://bintray.com/bintray/jcenter) is shadowing all maven central packages, you don't need to include both.

src/main/kotlin/com/omarsmak/kafka/consumer/lag/monitoring/response/PrometheusResponseView.kt

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,18 @@ class PrometheusResponseView : ResponseView {
4444
.labelNames("group", "topic")
4545
.register()
4646

47-
private val kafkaConsumerMemberLag = Gauge.build()
47+
private val kafkaConsumerMemberLagGauge = Gauge.build()
4848
.name("kafka_consumer_group_member_lag")
4949
.help("The total lag of a consumer group member behind the head of a topic. This gives the total lags over each consumer member within consumer group")
5050
.labelNames("group", "member", "topic")
5151
.register()
5252

53+
private val kafkaConsumerMemberPartitionLagGauge = Gauge.build()
54+
.name("kafka_consumer_group_member_partition_lag")
55+
.help("The lag of a consumer member within consumer group behind the head of a given partition of a topic")
56+
.labelNames("group", "member", "topic", "partition")
57+
.register()
58+
5359
private fun startServer(port: Int) {
5460
// Start a HTTP server to expose metrics
5561
logger.info("Starting HTTP server on $port....")
@@ -80,6 +86,8 @@ class PrometheusResponseView : ResponseView {
8086
* `kafka_consumer_group_offset{group, topic, partition}`
8187
* `kafka_consumer_group_partition_lag{group, topic, partition}`
8288
* `kafka_consumer_group_total_lag{group, topic}`
89+
* `kafka_consumer_group_member_lag{group, member, topic}`
90+
* `kafka_consumer_group_member_partition_lag{group, member, topic, partition}`
8391
* `kafka_topic_latest_offsets{group, topic, partition}
8492
*/
8593
private fun initialize(client: KafkaConsumerLagClient, initialConsumerGroups: List<String>, port: Int, monitoringPollInterval: Long) {
@@ -91,6 +99,13 @@ class PrometheusResponseView : ResponseView {
9199
// Start publishing our metrics
92100
Timer().scheduleAtFixedRate(0, monitoringPollInterval) {
93101

102+
// reset metrics
103+
kafkaConsumerGroupOffsetGauge.clear()
104+
kafkaConsumerLagPerPartitionGauge.clear()
105+
kafkaTopicLatestOffsetsGauge.clear()
106+
kafkaConsumerTotalLagGauge.clear()
107+
kafkaConsumerMemberLagGauge.clear()
108+
94109
val consumers = Utils.getTargetConsumerGroups(kafkaConsumerLagClient, initialConsumerGroups)
95110

96111
consumers.forEach { consumer ->
@@ -126,7 +141,11 @@ class PrometheusResponseView : ResponseView {
126141
// Push metrics
127142
memberLag.forEach { (member, lags) ->
128143
lags.forEach {
129-
kafkaConsumerMemberLag.labels(consumer, member, it.topicName).set(it.totalLag.toDouble())
144+
kafkaConsumerMemberLagGauge.labels(consumer, member, it.topicName).set(it.totalLag.toDouble())
145+
146+
it.lagPerPartition.forEach { (t, u) ->
147+
kafkaConsumerMemberPartitionLagGauge.labels(consumer, member, it.topicName, t.toString()).set(u.toDouble())
148+
}
130149
}
131150
}
132151
} catch (e: KafkaConsumerLagClientException) {

0 commit comments

Comments
 (0)