From 26571601d30332e6c47af4a86ca5c8a652986748 Mon Sep 17 00:00:00 2001 From: Karthikeyan Subramanian Date: Sat, 10 Feb 2024 14:18:28 -0600 Subject: [PATCH 1/2] Broadcast Kafka topic common configurations as Prometheus metrics, as this will prove invaluable for Kafka administrators during troubleshooting scenarios --- kafka_exporter.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/kafka_exporter.go b/kafka_exporter.go index 36021149..3ce0308d 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -56,12 +56,14 @@ var ( consumergroupLagSum *prometheus.Desc consumergroupLagZookeeper *prometheus.Desc consumergroupMembers *prometheus.Desc + describeConfigTopic *prometheus.Desc ) // Exporter collects Kafka stats from the given server and exports them using // the prometheus metrics package. type Exporter struct { client sarama.Client + adminClient sarama.ClusterAdmin topicFilter *regexp.Regexp topicExclude *regexp.Regexp groupFilter *regexp.Regexp @@ -116,6 +118,14 @@ type kafkaOpts struct { verbosityLogLevel int } +type TopicConfig struct { + CleanupPolicy string + RetentionMs string + MaxMessageBytes string + SegmentBytes string + RetentionBytes string +} + // CanReadCertAndKey returns true if the certificate and key files already exists, // otherwise returns false. If lost one of cert and key, returns error. func CanReadCertAndKey(certPath, keyPath string) (bool, error) { @@ -261,10 +271,17 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF return nil, errors.Wrap(err, "Error Init Kafka Client") } + adminClient, err := sarama.NewClusterAdminFromClient(client) + + if err != nil { + return nil, errors.Wrap(err, "Error Init Admin Kafka Client") + } + klog.V(TRACE).Infoln("Done Init Clients") // Init our exporter. return &Exporter{ client: client, + adminClient: adminClient, topicFilter: regexp.MustCompile(topicFilter), topicExclude: regexp.MustCompile(topicExclude), groupFilter: regexp.MustCompile(groupFilter), @@ -312,6 +329,7 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) { ch <- consumergroupLag ch <- consumergroupLagZookeeper ch <- consumergroupLagSum + ch <- describeConfigTopic } // Collect fetches the stats from configured Kafka location and delivers them @@ -395,6 +413,42 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { return } + // Get config for all topics + + for _, topic := range topics { + resource := sarama.ConfigResource{ + Type: sarama.TopicResource, + Name: topic, + } + entries, err := e.adminClient.DescribeConfig(resource) + if err != nil { + log.Printf("Failed to describe config for topic %s: %v", topic, err) + continue + } + + topicConfig := &TopicConfig{} + + for _, entry := range entries { + switch entry.Name { + case "cleanup.policy": + topicConfig.CleanupPolicy = entry.Value + case "retention.ms": + topicConfig.RetentionMs = entry.Value + case "max.message.bytes": + topicConfig.MaxMessageBytes = entry.Value + case "segment.bytes": + topicConfig.SegmentBytes = entry.Value + case "retention.bytes": + topicConfig.RetentionBytes = entry.Value + } + } + + ch <- prometheus.MustNewConstMetric( + describeConfigTopic, prometheus.GaugeValue, 1, topic, topicConfig.CleanupPolicy, topicConfig.RetentionMs, topicConfig.MaxMessageBytes, topicConfig.SegmentBytes, topicConfig.RetentionBytes, + ) + } + + topicChannel := make(chan string) getTopicMetrics := func(topic string) { @@ -892,6 +946,12 @@ func setup( []string{"consumergroup"}, labels, ) + describeConfigTopic = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "topic", "config"), + "Topic Configuration", + []string{"topic", "cleanup_policy", "retention_ms", "max_message_bytes", "segment_bytes", "retention_bytes"}, labels, + ) + if logSarama { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } From dcf74561a06ffe8c684c1665f937b2b0a28a1139 Mon Sep 17 00:00:00 2001 From: karthikeyansubramanian Date: Sat, 10 Feb 2024 14:26:38 -0600 Subject: [PATCH 2/2] Update README.md with kafka_topic_config --- README.md | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index e99a260c..a4c61ebe 100644 --- a/README.md +++ b/README.md @@ -184,16 +184,17 @@ kafka_brokers 3 **Metrics details** -| Name | Exposed informations | -|----------------------------------------------------|-----------------------------------------------------| -| `kafka_topic_partitions` | Number of partitions for this Topic | -| `kafka_topic_partition_current_offset` | Current Offset of a Broker at Topic/Partition | -| `kafka_topic_partition_oldest_offset` | Oldest Offset of a Broker at Topic/Partition | -| `kafka_topic_partition_in_sync_replica` | Number of In-Sync Replicas for this Topic/Partition | -| `kafka_topic_partition_leader` | Leader Broker ID of this Topic/Partition | -| `kafka_topic_partition_leader_is_preferred` | 1 if Topic/Partition is using the Preferred Broker | -| `kafka_topic_partition_replicas` | Number of Replicas for this Topic/Partition | -| `kafka_topic_partition_under_replicated_partition` | 1 if Topic/Partition is under Replicated | +| Name | Exposed informations | +|----------------------------------------------------|----------------------------------------------------------------------------------------------------------| +| `kafka_topic_partitions` | Number of partitions for this Topic | +| `kafka_topic_partition_current_offset` | Current Offset of a Broker at Topic/Partition | +| `kafka_topic_partition_oldest_offset` | Oldest Offset of a Broker at Topic/Partition | +| `kafka_topic_partition_in_sync_replica` | Number of In-Sync Replicas for this Topic/Partition | +| `kafka_topic_partition_leader` | Leader Broker ID of this Topic/Partition | +| `kafka_topic_partition_leader_is_preferred` | 1 if Topic/Partition is using the Preferred Broker | +| `kafka_topic_partition_replicas` | Number of Replicas for this Topic/Partition | +| `kafka_topic_partition_under_replicated_partition` | 1 if Topic/Partition is under Replicated | +| `kafka_topic_config` | cleanup.policy, max.message.bytes, retention.bytes, retention.ms, segment_bytes for each topic | **Metrics output example** @@ -229,6 +230,11 @@ kafka_topic_partition_replicas{partition="0",topic="__consumer_offsets"} 3 # HELP kafka_topic_partition_under_replicated_partition 1 if Topic/Partition is under Replicated # TYPE kafka_topic_partition_under_replicated_partition gauge kafka_topic_partition_under_replicated_partition{partition="0",topic="__consumer_offsets"} 0 + +# HELP kafka_topic_config 1 by default +# TYPE kafka_topic_config gauge +kafka_topic_config{cleanup_policy="compact",max_message_bytes="33554432",retention_bytes="-1",retention_ms="115200000",segment_bytes="104857600",topic="__consumer_offsets"} 1 + ``` ### Consumer Groups