Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ instances:
<!-- xxz tab xxx -->
<!-- xxz tabs xxx -->

### Cluster Monitoring (Preview)

In addition to consumer lag metrics, this integration can collect comprehensive cluster metadata when `enable_cluster_monitoring` is enabled:

- **Broker information**: Configuration and health metrics
- **Topic and partition details**: Sizes, offsets, replication status
- **Consumer group metadata**: Member details and group state
- **Schema registry**: Schema information (if schema_registry_url is provided)

All cluster monitoring metrics are tagged with `kafka_cluster_id` for easy filtering.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not broker id as well? Cardinality? No info available? Something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most metrics are cluster wide metrics, so not specific to only one broker. I will make sure that metrics specific to one broker are tagged with broker id.


**Note**: This feature is in preview and may increase Agent resource consumption on large clusters. Configuration and schema events are cached to reduce volume.

Example configuration:
```yaml
instances:
- kafka_connect_str: localhost:9092
enable_cluster_monitoring: true
schema_registry_url: http://localhost:8081 # optional
```

### Validation

1. [Run the Agent's status subcommand][8] and look for `kafka_consumer` under the Checks section.
Expand Down
15 changes: 15 additions & 0 deletions kafka_consumer/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,18 @@ files:
type: boolean
example: false
display_default: false
- name: enable_cluster_monitoring
description: |
(PREVIEW) Enable cluster monitoring to collect broker, topic, partition, consumer group,
and schema registry metadata. Events are cached and tagged with kafka_cluster_id.
May increase resource consumption on large clusters.
value:
type: boolean
example: false
display_default: false
- name: schema_registry_url
description: |
Schema Registry URL. When set with enable_cluster_monitoring, collects schema information.
value:
type: string
example: http://localhost:8081
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/21736.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Kafka Cluster Monitoring
611 changes: 611 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ def __init__(self, init_config, instance, log) -> None:
# Data Streams live messages
self.live_messages_configs = instance.get('live_messages_configs', [])

# Cluster metadata collection (single flag to enable all cluster monitoring)
enable_cluster_monitoring = is_affirmative(instance.get('enable_cluster_monitoring', False))

# If cluster monitoring is enabled, collect all metadata
self._collect_broker_metadata = enable_cluster_monitoring
self._collect_topic_metadata = enable_cluster_monitoring
self._collect_consumer_group_metadata = enable_cluster_monitoring
Comment on lines +90 to +92
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure why we are splitting the three of them since (1) all have the same value, and (2) in the only usages I can see (link1, link2) they are used in an OR condition or sequential IFs.

Unless you have in mind a future usage where you can use them independently.


# Schema registry: auto-enable if URL is provided
self._schema_registry_url = instance.get('schema_registry_url')
self._collect_schema_registry = enable_cluster_monitoring and self._schema_registry_url is not None

def validate_config(self):
if not self._kafka_connect_str:
raise ConfigurationError('`kafka_connect_str` is required')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def instance_empty_default_hostname():
return False


def instance_enable_cluster_monitoring():
return False


def instance_min_collection_interval():
return 15

Expand All @@ -56,6 +60,10 @@ def instance_sasl_kerberos_service_name():
return 'kafka'


def instance_schema_registry_url():
return 'http://localhost:8081'


def instance_security_protocol():
return 'PLAINTEXT'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class InstanceConfig(BaseModel):
data_streams_enabled: Optional[bool] = None
disable_generic_tags: Optional[bool] = None
empty_default_hostname: Optional[bool] = None
enable_cluster_monitoring: Optional[bool] = None
kafka_client_api_version: Optional[str] = None
kafka_connect_str: Union[str, tuple[str, ...]]
metric_patterns: Optional[MetricPatterns] = None
Expand All @@ -67,6 +68,7 @@ class InstanceConfig(BaseModel):
sasl_oauth_token_provider: Optional[SaslOauthTokenProvider] = None
sasl_plain_password: Optional[str] = None
sasl_plain_username: Optional[str] = None
schema_registry_url: Optional[str] = None
security_protocol: Optional[str] = None
service: Optional[str] = None
tags: Optional[tuple[str, ...]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,15 @@ instances:
## Collects consumer lag metric in seconds. It may result in additional billing charges.
#
# data_streams_enabled: false

## @param enable_cluster_monitoring - boolean - optional - default: false
## (PREVIEW) Enable cluster monitoring to collect broker, topic, partition, consumer group,
## and schema registry metadata. Events are cached and tagged with kafka_cluster_id.
## May increase resource consumption on large clusters.
#
# enable_cluster_monitoring: false

## @param schema_registry_url - string - optional - default: http://localhost:8081
## Schema Registry URL. When set with enable_cluster_monitoring, collects schema information.
#
# schema_registry_url: http://localhost:8081
17 changes: 17 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from datadog_checks.base import AgentCheck, is_affirmative
from datadog_checks.kafka_consumer.client import KafkaClient
from datadog_checks.kafka_consumer.cluster_metadata import ClusterMetadataCollector
from datadog_checks.kafka_consumer.config import KafkaConfig
from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS, OFFSET_INVALID

Expand All @@ -36,6 +37,9 @@ def __init__(self, name, init_config, instances):
self.topic_partition_cache = {}
self.check_initializations.insert(0, self.config.validate_config)

# Initialize cluster metadata collector
self.metadata_collector = ClusterMetadataCollector(self, self.client, self.config, self.log)

def check(self, _):
"""The main entrypoint of the check."""
# Fetch Kafka consumer offsets
Expand Down Expand Up @@ -106,6 +110,19 @@ def check(self, _):
cluster_id,
)
self.data_streams_live_message(highwater_offsets or {}, cluster_id)

# Collect cluster metadata if enabled
if (
self.config._collect_broker_metadata
or self.config._collect_topic_metadata
or self.config._collect_consumer_group_metadata
or self.config._collect_schema_registry
):
try:
self.metadata_collector.collect_all_metadata()
except Exception as e:
self.log.error("Error collecting cluster metadata: %s", e)

if self.config._close_admin_client:
self.client.close_admin_client()

Expand Down
37 changes: 37 additions & 0 deletions kafka_consumer/local_dev/README_TESTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Kafka Consumer Integration - Local Testing

## Setup

```bash
# 1. Set your Datadog API key
cp env.template .env
# Edit .env and add DD_API_KEY

# 2. Start everything
docker compose -f docker-compose-test.yml up -d

# 3. Verify
docker exec datadog-agent agent status | grep kafka_consumer
```

## What's Running

- Kafka with 6 topics (orders, users, events, payments, analytics, notifications)
- Schema Registry with 5 AVRO schemas
- Producer generating messages every 10s
- 5 consumer groups (one intentionally slow to create lag)
- Datadog Agent with `enable_cluster_monitoring: true`

## View Data

Metrics: https://app.datadoghq.com/metric/explorer
- Filter by `kafka_cluster_id:WxsTRn91ShWgIdlNQWBTxg`
- Example metrics: `kafka.broker.count`, `kafka.topic.size`, `kafka.consumer_lag`

Dashboard: Import `kafka_cluster_dashboard.json` at https://app.datadoghq.com/dashboard/lists

## Stop

```bash
docker compose -f docker-compose-test.yml down -v
```
21 changes: 21 additions & 0 deletions kafka_consumer/local_dev/conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
init_config:

instances:
- kafka_connect_str: kafka:29092

# Monitor all consumer groups
monitor_unlisted_consumer_groups: true

# Enable cluster metadata collection
collect_broker_metadata: true
collect_topic_metadata: true
collect_consumer_group_metadata: true
collect_schema_registry: true

schema_registry_url: http://schema-registry:8081

# Enable data streams for lag in seconds
data_streams_enabled: true

min_collection_interval: 30

Loading
Loading