Pulse is a distributed log collection and storage system that uses Kafka for message queuing and ClickHouse for efficient storage and querying of log data.
The system consists of two main components:
- Agent: Collects logs via HTTP API endpoints and sends them to a Kafka topic.
- Collector: Consumes logs from Kafka and stores them in ClickHouse.
- Docker and Docker Compose
- Go 1.23+ (for local development)
-
Copy the example environment file and update as needed:
cp .env.example .env
-
Build and start the services:
make build
The agent component receives JSON-formatted log events via HTTP endpoints, processes them into the Event model, and produces messages to Kafka. The agent uses a flexible transport layer that currently supports HTTP and is designed for easy extension to other protocols like gRPC.
You can send events to the agent using HTTP:
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{
"event_time_ms": 1651234567890,
"service": "my-service",
"level": "INFO",
"message": "User logged in",
"host": "server-1"
}'
The collector consumes log events from Kafka and stores them in ClickHouse for efficient querying and analysis.
You can query and filter logs using the HTTP API:
curl -X GET "http://localhost:8080/events/filters?service=my-service&level=INFO&per_page=50&page=1&start_time=1651234567890&end_time=1651334567890&search=logged%20in&sort_order=DESC"
Available query parameters:
service
: Filter by service namelevel
: Filter by log level (DEBUG, INFO, WARN, ERROR)host
: Filter by hostnamerequest_id
: Filter by request IDsearch
: Search for text in the message fieldper_page
: Number of results per page (default: 15)page
: Page number to retrieve (default: 1)start_time
: Filter events after this timestampend_time
: Filter events before this timestampsort_order
: Results order (ASC or DESC, default: ASC)
The response is a JSON array of log events:
[
{
"event_time_ms": 1651234567890,
"service": "my-service",
"level": "INFO",
"message": "User logged in",
"host": "server-1",
"request_id": "550e8400-e29b-41d4-a716-446655440000"
}
]
Logs are stored in ClickHouse with a TTL of 30 days. The schema includes:
- EventTimeMs (UInt64)
- Timestamp (DateTime, materialized from EventTimeMs)
- Service (String)
- Level (Enum: DEBUG, INFO, WARN, ERROR)
- Message (String)
- Host (String)
- RequestID (UUID)
The data is partitioned by day for optimal query performance.
.
├── cmd/
│ ├── agent/ # Agent application entry point
│ └── collector/ # Collector application entry point
├── internal/
│ ├── agent/ # Agent specific code
│ ├── collector/ # Collector specific code
│ └── storage/ # Storage layer (ClickHouse)
├── pkg/
│ ├── logger/ # Logging utilities
│ ├── models/ # Shared data models
│ └── transport/ # Transport layer (HTTP, gRPC)
└── scripts/
├── entrypoint.sh # Container entrypoint script
└── init-clickhouse.sql # ClickHouse initialization script
The following Make commands are available for development:
make build
- Build and start containers in detached modemake start
- Start existing containersmake stop
- Stop and remove containersmake restart
- Restart containersmake logs
- Tail container logsmake clean
- Stop containers and remove volumes, imagesmake help
- Show available commands
Configure the application using environment variables (see .env.example
):
KAFKA_BROKER
: Kafka broker address (default: kafka:9092)KAFKA_TOPIC
: Kafka topic for logs (default: logs)CLICKHOUSE_ADDR
: ClickHouse server address (default: clickhouse:9000)CLICKHOUSE_DB
: ClickHouse database name (default: gologcentral)CLICKHOUSE_USER
: ClickHouse username (default: default)CLICKHOUSE_PASS
: ClickHouse passwordLOG_LEVEL
: Logging verbosity (options: debug, info, warn, error, default: info)HTTP_PORT
: Port for agent HTTP transport (default: 8080)HTTP_ENDPOINT
: Endpoint path for receiving events (default: /events)
Pulse uses a pluggable transport layer architecture that allows for multiple protocols to receive events:
- HTTP Transport: Currently implemented, accepts POST requests with JSON event data and GET requests for querying logs
- gRPC Transport: Planned for future implementation
Pulse uses structured JSON logging powered by Zap. This provides:
- High-performance logging with minimal allocations
- Structured JSON output for easy parsing by log aggregation tools
- Different log levels (debug, info, warn, error) configurable via environment variables
- Context-rich logs with consistent fields across components
You can control the verbosity of logging using the LOG_LEVEL
environment variable.