Skip to content

Commit 20f4294

Browse files
committed
Add metric to track number of kafka producers, this will help us
determine if we need to tune the producer buffers to lower the memory footprint
1 parent d1f92fc commit 20f4294

File tree

2 files changed

+7
-0
lines changed

2 files changed

+7
-0
lines changed

singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class SingerMetrics {
7575
public static final String KAFKA_LATENCY = SINGER_WRITER + "max_kafka_batch_write_latency";
7676
public static final String NUM_COMMITED_TRANSACTIONS = SINGER_WRITER + "num_committed_transactions";
7777
public static final String NUM_ABORTED_TRANSACTIONS = SINGER_WRITER + "num_aborted_transactions";
78+
public static final String NUM_KAFKA_PRODUCERS = SINGER_WRITER + "num_kafka_producers";
7879

7980
public static final String KUBE_PREFIX = SINGER_PREIX + "kube.";
8081
public static final String KUBE_API_ERROR = KUBE_PREFIX + "api_error";

singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.pinterest.singer.writer;
1717

18+
import com.pinterest.singer.common.SingerMetrics;
19+
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
1820
import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
1921
import com.pinterest.singer.utils.KafkaUtils;
2022

@@ -81,6 +83,8 @@ private KafkaProducer<byte[], byte[]> getProducerInternal(KafkaProducerConfig co
8183
if (result != null && result != producer) {
8284
producer.close();
8385
}
86+
// log metrics for no.of kafka producers currently in the cache
87+
OpenTsdbMetricConverter.addMetric(SingerMetrics.NUM_KAFKA_PRODUCERS, producers.size());
8488
}
8589
result = producers.get(config);
8690
return result;
@@ -106,6 +110,8 @@ private boolean resetProducerInternal(KafkaProducerConfig config) {
106110
if (!retval) {
107111
newProducer.close();
108112
}
113+
// log metrics for no.of kafka producers currently in the cache
114+
OpenTsdbMetricConverter.addMetric(SingerMetrics.NUM_KAFKA_PRODUCERS, producers.size());
109115
}
110116
return retval;
111117
}

0 commit comments

Comments
 (0)