A powerful, multi-purpose command-line tool to inspect Kafka topics.
- Cluster Overview: Get a high-level summary of your cluster, including topics, partitions, brokers, consumer groups, and optional integration with Schema Registry and Kafka Connect.
- Deduplication: Find duplicate messages based on message
key
,value
, or a specific JSONfield
. - Topic Listing: List all topics, with an interactive search for environments with many topics.
- Consumer Lag: Check the current consumer group lag for any topic.
- Message Search: Search for messages containing a specific string or matching a regular expression (
grep
for Kafka). - Message Peeking: Quickly view the first or last N messages on a topic.
- Flexible Storage: Uses an in-memory store by default and can switch to a SQLite backend for large-scale operations.
- Structured Output: Output data in plain text,
JSONL
, orCSV
for easy integration with other tools.
-
Clone the repository.
-
Install the required Python packages:
pip install confluent-kafka requests
To run the test suite, which uses mock objects and does not require a live Kafka cluster, use the following command:
python3 -m unittest test_kafkainspect.py
The script can be run directly from the command line.
Get a high-level summary of your Kafka cluster. This is useful for a quick health check.
python3 kafkainspect.py \
--bootstrap-servers <host:port> \
--overview
For a more complete picture, include your Schema Registry and Kafka Connect URLs:
python3 kafkainspect.py \
--bootstrap-servers <host:port> \
--overview \
--schema-registry-url http://localhost:8081 \
--connect-url http://localhost:8083
Scan a topic and find duplicates based on the entire message value:
python kafkainspect.py \
--bootstrap-servers localhost:9092 \
--topic my-topic \
--dedup-by value
List all available topics. If there are more than 50, an interactive search prompt will start automatically.
python3 kafkainspect.py --bootstrap-servers <host:port> --list-topics
Monitor the lag for a specific consumer group on a topic.
python3 kafkainspect.py \
--bootstrap-servers <host:port> \
--topic my-topic \
--group-id my-consumer-group \
--check-lag
Quickly view the last 5 messages on a topic. Use a negative number to see the first 5 (e.g., --peek -5
).
python3 kafkainspect.py \
--bootstrap-servers <host:port> \
--topic my-topic \
--peek 5
Find all messages containing the string "error". Use --regex
for regular expression matching.
python3 kafkainspect.py \
--bootstrap-servers <host:port> \
--topic logs.production \
--search "error"
If your messages are JSON, you can find duplicates based on a nested field's value:
python3 kafkainspect.py \
--bootstrap-servers localhost:9092 \
--topic events.json \
--field user.id
For topics with millions of messages, use the --sqlite
flag to store message hashes on disk, preventing high memory usage:
python3 kafkainspect.py \
--bootstrap-servers localhost:9092 \
--topic big-data-topic \
--max-messages 10000000 \
--sqlite /tmp/kafka_dedup.db
Save the details of duplicate messages to a file in JSONL
format. Also supports csv
and text
.
python3 kafkainspect.py \
--bootstrap-servers localhost:9092 \
--topic logs.production \
--output duplicates.jsonl:jsonl
usage: kafkainspect.py [-h] --bootstrap-servers BOOTSTRAP_SERVERS [--topic TOPIC] [--overview] [--schema-registry-url SCHEMA_REGISTRY_URL] [--connect-url CONNECT_URL] [--list-topics] [--check-lag]
[--search SEARCH] [--regex] [--peek PEEK] [--group-id GROUP_ID] [--start {earliest,latest}] [--dedup-by {value,key}] [--field FIELD] [--max-messages MAX_MESSAGES]
[--sqlite SQLITE] [--output OUTPUT] [--silent]
Kafka topic inspector and deduplication tool.
options:
-h, --help show this help message and exit
--bootstrap-servers BOOTSTRAP_SERVERS
Comma-separated list of Kafka bootstrap servers
--topic TOPIC Kafka topic to scan. Not required if --list-topics or --check-lag is used.
--overview Display a high-level overview of the cluster.
--schema-registry-url SCHEMA_REGISTRY_URL
URL for the Schema Registry to include in overview.
--connect-url CONNECT_URL
URL for Kafka Connect to include in overview.
--list-topics List topics interactively and exit.
--check-lag Check consumer group lag for a topic.
--search SEARCH Search for a pattern in message values.
--regex Treat search pattern as a regular expression.
--peek PEEK Peek at the first N (if negative) or last N (if positive) messages.
--group-id GROUP_ID Consumer group id (default: kafkainspect)
--start {earliest,latest}
Start offset
--dedup-by {value,key}
Field to deduplicate by (default: value)
--field FIELD JSON field to deduplicate by (e.g., user.id). Overrides --dedup-by.
--max-messages MAX_MESSAGES
Limit messages to avoid OOM
--sqlite SQLITE Optional SQLite path for large-scale deduplication
--output OUTPUT Optional path to output file (e.g., out.txt:text, out.jsonl:jsonl, out.csv:csv)
--silent Suppress stdout output of duplicates