diff --git a/deploy/.env b/deploy/.env index e11b21ab..882d2e4d 100644 --- a/deploy/.env +++ b/deploy/.env @@ -44,8 +44,22 @@ SLACK_CHANNELS="" AGENTS_LOGS_APP_LOG_FILE_PATH="${COMMON_LOGS_DIR}/agents-logs-app.log" AGENTS_LOGS_LOG_FILE_PATH="${COMMON_LOGS_DIR}/agents-logs.log" +# ========================================== +# Local Testing Only - DO NOT COMMIT +# ========================================== + +# LocalStack Configuration (for local S3/SQS simulation) +AWS_ENDPOINT_URL=http://localstack:4566 +S3_ENDPOINT_URL=http://localstack:4566 + # Metrics Publisher Configuration METRICS_ENABLED=true METRICS_AGENT_URL=tcp://host.docker.internal:18126 METRICS_SERVICE_NAME=gprofiler-webapp METRICS_SLI_UUID=test-sli-uuid-12345 + +# Indexer Metrics Configuration (for local testing) +INDEXER_METRICS_ENABLED=true +INDEXER_METRICS_AGENT_URL=tcp://host.docker.internal:18126 +INDEXER_METRICS_SERVICE_NAME=gprofiler-indexer +INDEXER_METRICS_SLI_UUID=test-sli-uuid-indexer-67890 diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml index a7e206f1..e8c3e7a5 100644 --- a/deploy/docker-compose.yml +++ b/deploy/docker-compose.yml @@ -87,6 +87,9 @@ services: - AWS_DEFAULT_REGION=$AWS_REGION - SLACK_BOT_TOKEN=$SLACK_BOT_TOKEN - SLACK_CHANNELS=$SLACK_CHANNELS + # Local Testing: S3 Endpoint for LocalStack + - S3_ENDPOINT_URL=$S3_ENDPOINT_URL + # Local Testing: Metrics Configuration - METRICS_ENABLED=$METRICS_ENABLED - METRICS_AGENT_URL=$METRICS_AGENT_URL - METRICS_SERVICE_NAME=$METRICS_SERVICE_NAME @@ -183,6 +186,13 @@ services: - AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY - AWS_SESSION_TOKEN=$AWS_SESSION_TOKEN + # Local Testing: Metrics Configuration + - METRICS_ENABLED=$INDEXER_METRICS_ENABLED + - METRICS_AGENT_URL=$INDEXER_METRICS_AGENT_URL + - METRICS_SERVICE_NAME=$INDEXER_METRICS_SERVICE_NAME + - METRICS_SLI_UUID=$INDEXER_METRICS_SLI_UUID + # Local Testing: LocalStack Endpoint + - AWS_ENDPOINT_URL=$AWS_ENDPOINT_URL #--- nginx-load-balancer: image: nginx:1.23.3 @@ -199,6 +209,31 @@ services: - agents-logs-backend - webapp + # --- + # LocalStack for local S3/SQS simulation (Local Testing Only) + localstack: + image: localstack/localstack:3.0 + container_name: gprofiler-ps-localstack + restart: always + ports: + - "4566:4566" # LocalStack Gateway + - "4510-4559:4510-4559" # External services port range + environment: + - SERVICES=s3,sqs + - DEBUG=1 + - DOCKER_HOST=unix:///var/run/docker.sock + - AWS_DEFAULT_REGION=us-east-1 + - AWS_ACCESS_KEY_ID=test + - AWS_SECRET_ACCESS_KEY=test + volumes: + - "./localstack_init:/etc/localstack/init/ready.d" + - "/var/run/docker.sock:/var/run/docker.sock" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"] + interval: 10s + timeout: 5s + retries: 5 + volumes: db_clickhouse: driver: local diff --git a/deploy/localstack_init/01_init_s3_sqs.sh b/deploy/localstack_init/01_init_s3_sqs.sh new file mode 100755 index 00000000..cbcc8188 --- /dev/null +++ b/deploy/localstack_init/01_init_s3_sqs.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +echo "πŸš€ Initializing LocalStack S3 and SQS..." + +# Create S3 bucket (must match BUCKET_NAME in .env) +awslocal s3 mb s3://performance_studio_bucket +echo "βœ… S3 bucket 'performance_studio_bucket' created" + +# Create SQS queue +awslocal sqs create-queue --queue-name performance_studio_queue +echo "βœ… SQS queue 'performance_studio_queue' created" + +# Get queue URL +QUEUE_URL=$(awslocal sqs get-queue-url --queue-name performance_studio_queue --output text) +echo "βœ… Queue URL: $QUEUE_URL" + +echo "πŸŽ‰ LocalStack initialization complete!" + + diff --git a/docs/METRICS_PUBLISHER_INDEXER_DOCUMENTATION.md b/docs/METRICS_PUBLISHER_INDEXER_DOCUMENTATION.md new file mode 100644 index 00000000..6cbb03e9 --- /dev/null +++ b/docs/METRICS_PUBLISHER_INDEXER_DOCUMENTATION.md @@ -0,0 +1,1263 @@ +# Metrics Publisher - Indexer Documentation + +## Overview + +The Metrics Publisher is a lightweight, production-ready component for tracking Service Level Indicators (SLIs) in the gProfiler Performance Studio **indexer** service. It sends metrics in Graphite plaintext protocol format over TCP to a metrics agent for monitoring and alerting. + +--- + +## Table of Contents + +1. [Purpose and Use Case](#purpose-and-use-case) +2. [Architecture](#architecture) +3. [Implementation Details](#implementation-details) +4. [Usage Guide](#usage-guide) +5. [Configuration](#configuration) +6. [Metrics Format](#metrics-format) +7. [Code Examples](#code-examples) +8. [Testing](#testing) +9. [Troubleshooting](#troubleshooting) + +--- + +## Purpose and Use Case + +### What Problem Does It Solve? + +The Metrics Publisher enables **meaningful availability tracking** for the indexer service by measuring: +- **Success Rate**: Percentage of events processed successfully +- **Error Budget**: Tracking failures that count against SLO +- **Service Health**: Real-time monitoring of event processing + +### SLI (Service Level Indicator) Tracking + +The publisher tracks three response types: +1. **`success`**: Event processed completely (counts toward SLO) +2. **`failure`**: Server error, counts against error budget (impacts SLO) +3. **`ignored_failure`**: Client error, doesn't count against SLO + +### Key Use Cases + +- **SLO Monitoring**: Track if service meets 99.9% success rate +- **Alerting**: Trigger alerts when error budget is consumed +- **Debugging**: Identify which operations are failing +- **Capacity Planning**: Understand processing volumes + +--- + +## Architecture + +### Design Principles + +1. **Singleton Pattern**: One instance per application lifecycle +2. **Thread-Safe**: Safe for concurrent use from multiple goroutines +3. **Non-Blocking**: Metrics sending doesn't block main application flow +4. **Fail-Safe**: Application continues if metrics agent is unavailable +5. **Opt-In**: Disabled by default, must be explicitly enabled + +### Component Diagram + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Indexer Application β”‚ +β”‚ β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Worker │───────▢│ MetricsPublisher β”‚ β”‚ +β”‚ β”‚ (main.go) β”‚ β”‚ (singleton) β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ +β”‚ β”‚ - SendSLIMetric() β”‚ β”‚ +β”‚ β”‚ - SendErrorMetric() β”‚ β”‚ +β”‚ β”‚ - sendMetric() β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β”‚ β”‚ β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ TCP Connection + β”‚ (Graphite Protocol) + β–Ό + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Metrics Agent β”‚ + β”‚ (port 18126) β”‚ + β”‚ β”‚ + β”‚ - Receives metrics β”‚ + β”‚ - Forwards to β”‚ + β”‚ monitoring system β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Data Flow + +``` +1. Event Processing (worker.go) + β”‚ + β”œβ”€β–Ά Success? + β”‚ β”œβ”€β–Ά YES: GetMetricsPublisher().SendSLIMetric("success", ...) + β”‚ └─▢ NO: GetMetricsPublisher().SendSLIMetric("failure", ...) + β”‚ +2. MetricsPublisher checks if enabled + β”‚ + β”œβ”€β–Ά Disabled? β†’ Return immediately (no-op) + └─▢ Enabled? β†’ Continue + β”‚ +3. Build metric line (Graphite format) + β”‚ +4. Send via TCP socket (1 second timeout) + β”‚ + β”œβ”€β–Ά Success? β†’ Log and return + └─▢ Failed? β†’ Log error (throttled) and return +``` + +--- + +## Implementation Details + +### File: `src/gprofiler_indexer/metrics_publisher.go` + +#### Key Components + +##### 1. MetricsPublisher Struct + +```go +type MetricsPublisher struct { + host string // Metrics agent hostname + port string // Metrics agent port + serviceName string // Service identifier + sliMetricUUID string // SLI metric UUID + enabled bool // Master enable/disable flag + connectionFailed bool // Connection state tracking + lastErrorLogTime int64 // For error log throttling + errorLogInterval int64 // Error log interval (5 minutes) + mutex sync.Mutex // Thread safety +} +``` + +**Thread Safety:** +- `mutex` protects concurrent access to state fields +- Methods are safe to call from multiple goroutines + +##### 2. Singleton Pattern + +```go +var ( + metricsInstance *MetricsPublisher // Global singleton instance + metricsOnce sync.Once // Ensures single initialization +) + +// NewMetricsPublisher creates or returns the singleton instance +func NewMetricsPublisher(serverURL, serviceName, sliUUID string, enabled bool) *MetricsPublisher { + metricsOnce.Do(func() { + // Initialize once + metricsInstance = &MetricsPublisher{...} + }) + return metricsInstance +} + +// GetMetricsPublisher returns the singleton (safe to call before init) +func GetMetricsPublisher() *MetricsPublisher { + return metricsInstance // May be nil if not initialized +} +``` + +**Why Singleton?** +- Single TCP connection pool per application +- Consistent state across all callers +- Prevents connection exhaustion + +##### 3. Core Methods + +###### SendSLIMetric (Public) + +```go +func (m *MetricsPublisher) SendSLIMetric( + responseType string, // "success", "failure", "ignored_failure" + methodName string, // Operation name (e.g., "event_processing") + extraTags map[string]string // Additional context tags +) bool +``` + +**Purpose:** Send SLI metrics for SLO tracking + +**Guards:** +- Returns `false` immediately if `m == nil` +- Returns `false` if `!m.enabled` +- Returns `false` if `m.sliMetricUUID == ""` + +**Metric Format:** +``` +put error-budget.counters.{sliMetricUUID} {timestamp} 1 service={serviceName} response_type={responseType} method_name={methodName} [extraTags...] +``` + +###### SendErrorMetric (Public) + +```go +func (m *MetricsPublisher) SendErrorMetric( + metricName string, + extraTags map[string]string +) bool +``` + +**Purpose:** Send operational error metrics (non-SLI) + +**Use Case:** Track internal errors not related to SLO + +###### sendMetric (Private) + +```go +func (m *MetricsPublisher) sendMetric(metricLine string) bool +``` + +**Purpose:** Low-level TCP sending with error handling + +**Features:** +- Creates new TCP connection per metric (stateless) +- 1 second connection timeout +- 1 second write timeout +- Error log throttling (max once per 5 minutes) +- Connection recovery tracking + +##### 4. Error Handling + +**Throttled Logging:** +```go +// Only log errors once every 5 minutes to prevent log spam +m.mutex.Lock() +now := time.Now().Unix() +shouldLogError := now-m.lastErrorLogTime >= m.errorLogInterval +m.mutex.Unlock() + +if shouldLogError { + log.Warnf("Failed to connect to metrics agent: %v", err) + m.mutex.Lock() + m.lastErrorLogTime = now + m.mutex.Unlock() +} +``` + +**Graceful Degradation:** +- Application continues even if metrics agent is down +- No retries (fire-and-forget) +- Connection recovery logged on success + +--- + +## Usage Guide + +### Integration Pattern + +#### 1. Initialization (main.go) + +```go +func main() { + args := NewCliArgs() + args.ParseArgs() + + // Initialize metrics publisher (singleton) + metricsPublisher := NewMetricsPublisher( + args.MetricsAgentURL, // tcp://host:port + args.MetricsServiceName, // "gprofiler-indexer" + args.MetricsSLIUUID, // "test-sli-uuid-indexer-67890" + args.MetricsEnabled, // true/false + ) + + // ... rest of application ... + + // Cleanup on shutdown + if metricsPublisher != nil { + metricsPublisher.FlushAndClose() + } +} +``` + +#### 2. Usage in Workers (worker.go) + +**Pattern A: Direct Call (Recommended)** + +```go +// Get singleton instance and call directly +GetMetricsPublisher().SendSLIMetric( + ResponseTypeSuccess, + "event_processing", + map[string]string{ + "service": serviceName, + "filename": task.Filename, + }, +) +``` + +**Why this works:** +- `GetMetricsPublisher()` may return `nil` (not initialized) +- `SendSLIMetric()` has nil-safe check: `if m == nil { return false }` +- No panic, graceful no-op if disabled + +**Pattern B: Conditional Call (Optional)** + +```go +// Only for SQS events (not local file processing) +if useSQS { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeSuccess, + "event_processing", + tags, + ) +} +``` + +#### 3. Success Tracking Example + +```go +// Successful event processing +err := pw.ParseStackFrameFile(sess, task, args.S3Bucket, timestamp, buf) +if err != nil { + // Track failure + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": serviceName, + "error": "parse_or_write_failed", + "filename": task.Filename, + }, + ) + return +} + +// Track success +GetMetricsPublisher().SendSLIMetric( + ResponseTypeSuccess, + "event_processing", + map[string]string{ + "service": serviceName, + "filename": task.Filename, + }, +) +``` + +#### 4. Error Budget Tracking + +```go +// S3 fetch failed (counts against SLO) - NO MESSAGE DELETION +if err != nil { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, // ← Counts against error budget + "event_processing", + map[string]string{ + "service": serviceName, + "error": "s3_fetch_failed", + "filename": task.Filename, + }, + ) + // Do NOT delete message - let SQS retry for transient failures + log.Warnf("S3 fetch failed for %s, message will retry via SQS", task.Filename) + return err +} + +// SQS infrastructure failures (counts against SLO) +if recvErr != nil { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, // ← Infrastructure failure + "event_processing", + map[string]string{ + "service": "sqs_infrastructure", + "error": "sqs_receive_failed", + }, + ) + continue // Keep polling +} + +// Malformed messages (client error - doesn't count against SLO) +if parseErr != nil { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeIgnoredFailure, // ← Client error, doesn't impact SLO + "event_processing", + map[string]string{ + "service": "sqs_infrastructure", + "error": "sqs_message_parse_failed", + }, + ) + // Delete malformed messages to prevent infinite retry + deleteMessage(sess, queueURL, messageHandle) + continue +} +``` + +--- + +## Configuration + +### Environment Variables + +```bash +# Enable/disable metrics publishing +METRICS_ENABLED=true + +# Metrics agent TCP endpoint +METRICS_AGENT_URL=tcp://localhost:18126 + +# Service identifier for metrics +METRICS_SERVICE_NAME=gprofiler-indexer + +# SLI metric UUID for error budget tracking +METRICS_SLI_UUID=prod-sli-uuid-12345 +``` + +### Command-Line Flags + +```bash +./indexer \ + --metrics-enabled=true \ + --metrics-agent-url=tcp://metrics-agent:18126 \ + --metrics-service-name=gprofiler-indexer \ + --metrics-sli-uuid=prod-sli-uuid-12345 +``` + +### Configuration in Code (args.go) + +```go +type CLIArgs struct { + // Metrics Publisher Configuration + MetricsEnabled bool + MetricsAgentURL string + MetricsServiceName string + MetricsSLIUUID string +} + +func NewCliArgs() *CLIArgs { + return &CLIArgs{ + // Metrics defaults (disabled by default) + MetricsEnabled: false, + MetricsAgentURL: "tcp://localhost:18126", + MetricsServiceName: "gprofiler-indexer", + MetricsSLIUUID: "", + } +} +``` + +### Production vs Development + +#### Production Configuration +```bash +METRICS_ENABLED=true +METRICS_AGENT_URL=tcp://prod-metrics-agent.internal:18126 +METRICS_SERVICE_NAME=gprofiler-indexer +METRICS_SLI_UUID=a1b2c3d4-5678-90ab-cdef-1234567890ab # Real UUID +``` + +#### Development/Local Configuration +```bash +METRICS_ENABLED=true +METRICS_AGENT_URL=tcp://host.docker.internal:18126 +METRICS_SERVICE_NAME=gprofiler-indexer +METRICS_SLI_UUID=test-sli-uuid-indexer-67890 # Test UUID +``` + +#### Disabled Configuration +```bash +METRICS_ENABLED=false +# Other values are ignored when disabled +``` + +--- + +## Metrics Format + +### Graphite Plaintext Protocol + +The publisher uses the [Graphite plaintext protocol](http://graphite.readthedocs.io/en/latest/feeding-carbon.html): + +``` +put = = ... +``` + +### SLI Metric Format + +``` +put error-budget.counters. 1 service= response_type= method_name= [extra_tags...] +``` + +**Example:** +``` +put error-budget.counters.test-sli-uuid-indexer-67890 1761857905 1 service=gprofiler-indexer response_type=success method_name=event_processing service=devapp filename=2025-10-30T20:56:10_xxxxx.gz +``` + +### Field Descriptions + +| Field | Type | Description | Example | +|-------|------|-------------|---------| +| `metric_name` | string | Metric identifier with UUID | `error-budget.counters.test-sli-uuid-12345` | +| `timestamp` | integer | Unix epoch timestamp (seconds) | `1761857905` | +| `value` | integer | Always `1` (counter increment) | `1` | +| `service` | string (tag) | Service name from configuration | `gprofiler-indexer` | +| `response_type` | string (tag) | Result: `success`, `failure`, `ignored_failure` | `success` | +| `method_name` | string (tag) | Operation being tracked | `event_processing` | +| `extra_tags` | key=value | Additional context tags | `service=devapp filename=xxx.gz error=s3_fetch_failed` | + +### Response Types + +#### `success` +- Event processed completely +- Counts **FOR** SLO (increases success rate) +- Example: Profile parsed and inserted into ClickHouse + +#### `failure` +- Server-side error +- Counts **AGAINST** error budget (decreases availability) +- Example: S3 fetch failed, ClickHouse insertion failed + +#### `ignored_failure` +- Client-side error +- Does **NOT** count against SLO +- Example: Invalid profile format, malformed JSON + +--- + +## Code Examples + +### Example 1: Basic Success/Failure Tracking + +```go +func ProcessEvent(event SQSMessage) error { + // Attempt to process + result, err := doProcessing(event) + + if err != nil { + // Track failure + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": event.Service, + "error": err.Error(), + }, + ) + return err + } + + // Track success + GetMetricsPublisher().SendSLIMetric( + ResponseTypeSuccess, + "event_processing", + map[string]string{ + "service": event.Service, + }, + ) + return nil +} +``` + +### Example 2: Multi-Stage Processing with Proper SQS Retry Strategy + +```go +func ProcessProfile(task SQSMessage) error { + // Stage 1: Fetch from S3 + buf, err := GetFileFromS3(sess, bucket, task.Filename) + if err != nil { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": task.Service, + "error": "s3_fetch_failed", + "filename": task.Filename, + }, + ) + + // Do NOT delete message - let SQS retry for transient S3 failures + // Message will retry automatically via SQS visibility timeout + log.Warnf("S3 fetch failed for %s, message will retry via SQS", task.Filename) + return err + } + + // Stage 2: Parse and insert into ClickHouse + err = ParseAndInsert(buf) + if err != nil { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": task.Service, + "error": "parse_or_write_failed", + "filename": task.Filename, + }, + ) + + // Do NOT delete message - let SQS retry for transient ClickHouse/parsing failures + // Message will retry automatically via SQS visibility timeout + log.Warnf("Parse/write failed for %s, message will retry via SQS", task.Filename) + return err + } + + // Stage 3: Cleanup (only on success) + err = deleteMessage(sess, task.QueueURL, task.MessageHandle) + if err != nil { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": task.Service, + "error": "sqs_delete_failed", + "filename": task.Filename, + }, + ) + // Continue anyway - message was processed successfully + // SQS will eventually make message visible again, but that's better than data loss + log.Errorf("Failed to delete processed message: %v", err) + } + + // Success! + GetMetricsPublisher().SendSLIMetric( + ResponseTypeSuccess, + "event_processing", + map[string]string{ + "service": task.Service, + "filename": task.Filename, + }, + ) + return nil +} +``` + +### Example 3: SQS Infrastructure Monitoring + +```go +// In queue.go - SQS polling and connection monitoring +func ListenSqs(ctx context.Context, args *CLIArgs, ch chan<- SQSMessage, wg *sync.WaitGroup) { + // ... SQS setup ... + + urlResult, err := getQueueURL(sess, args.SQSQueue) + if err != nil { + logger.Errorf("Got an error getting the queue URL: %v", err) + + // SLI Metric: SQS queue URL resolution failure (infrastructure error) + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": "sqs_infrastructure", + "error": "sqs_queue_url_failed", + }, + ) + return + } + + for { + output, recvErr := svc.ReceiveMessage(&sqs.ReceiveMessageInput{...}) + if recvErr != nil { + logger.Error(recvErr) + + // SLI Metric: SQS message receive failure (infrastructure error) + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": "sqs_infrastructure", + "error": "sqs_receive_failed", + }, + ) + continue + } + + for _, message := range output.Messages { + var sqsMessage SQSMessage + parseErr := json.Unmarshal([]byte(*message.Body), &sqsMessage) + if parseErr != nil { + // SLI Metric: Malformed message (client error - doesn't count against SLO) + GetMetricsPublisher().SendSLIMetric( + ResponseTypeIgnoredFailure, + "event_processing", + map[string]string{ + "service": "sqs_infrastructure", + "error": "sqs_message_parse_failed", + }, + ) + + // Delete malformed messages to prevent infinite retry loop + // This is a permanent client error that won't be fixed by retrying + deleteMessage(sess, *urlResult.QueueUrl, *message.ReceiptHandle) + continue + } + + // Forward valid message to workers + ch <- sqsMessage + } + } +} +``` + +### Example 4: Conditional Metrics (Only for Production Traffic) + +```go +func Worker(tasks <-chan SQSMessage) { + for task := range tasks { + useSQS := task.Service != "" + + result := processTask(task) + + // Only track SQS events, not local file processing + if useSQS { + if result.Success { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeSuccess, + "event_processing", + map[string]string{"service": task.Service}, + ) + } else { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": task.Service, + "error": result.Error, + }, + ) + } + } + } +} +``` + +--- + +## Testing + +### Unit Testing Pattern + +```go +func TestMetricsPublisher_SendSLIMetric(t *testing.T) { + // Test 1: Disabled publisher (no-op) + pub := &MetricsPublisher{enabled: false} + result := pub.SendSLIMetric("success", "test_method", nil) + assert.False(t, result) // Should return false (no-op) + + // Test 2: Nil publisher (safe) + var nilPub *MetricsPublisher + result = nilPub.SendSLIMetric("success", "test_method", nil) + assert.False(t, result) // Should not panic + + // Test 3: Enabled but no UUID + pub = &MetricsPublisher{ + enabled: true, + sliMetricUUID: "", + } + result = pub.SendSLIMetric("success", "test_method", nil) + assert.False(t, result) // Should return false +} +``` + +### Integration Testing + +```bash +# Terminal 1: Start mock metrics agent (listens on TCP) +nc -l -k 18126 + +# Terminal 2: Run indexer with metrics enabled +METRICS_ENABLED=true \ +METRICS_AGENT_URL=tcp://localhost:18126 \ +METRICS_SLI_UUID=test-uuid-12345 \ +./indexer + +# Terminal 1: Should show metric lines +put error-budget.counters.test-uuid-12345 1761857905 1 service=gprofiler-indexer response_type=success method_name=event_processing +``` + +### Local Testing with Docker + +```bash +# Start services +cd deploy +docker-compose --profile with-clickhouse up -d + +# Check metrics in indexer logs +docker-compose logs ch-indexer | grep "πŸ“Š Sending SLI metric" + +# Expected output: +# level=info msg="πŸ“Š Sending SLI metric: put error-budget.counters.test-sli-uuid-indexer-67890 ..." +``` + +--- + +## Troubleshooting + +### Issue 1: Metrics Not Appearing + +**Symptoms:** +- No "Sending SLI metric" logs +- Metrics dashboard shows no data + +**Debugging Steps:** + +```bash +# Check if metrics enabled +docker-compose logs ch-indexer | grep "MetricsPublisher initialized" + +# Expected: "sli_enabled=true" +# If "sli_enabled=false", check METRICS_SLI_UUID is set +``` + +**Common Causes:** +1. `METRICS_ENABLED=false` +2. `METRICS_SLI_UUID` is empty +3. No events being processed (check SQS queue) + +### Issue 2: Connection Refused Errors + +**Symptoms:** +``` +level=warn msg="Failed to connect to metrics agent at host.docker.internal:18126: connection refused" +``` + +**Solutions:** + +```bash +# Check if metrics agent is running +nc -zv host.docker.internal 18126 + +# In Docker Compose, ensure host.docker.internal is resolvable +# Add to docker-compose.yml: +extra_hosts: + - "host.docker.internal:host-gateway" +``` + +### Issue 3: Metrics Sent But Not Visible + +**Symptoms:** +- Logs show "πŸ“Š Sending SLI metric" +- But metrics dashboard has no data + +**Check:** +1. **Metrics agent is receiving:** + ```bash + # Monitor metrics agent logs + tail -f /var/log/metrics-agent.log + ``` + +2. **Metric format is correct:** + ``` + put error-budget.counters.{uuid} {timestamp} 1 service=... + ``` + +3. **UUID matches dashboard query:** + ``` + error-budget.counters.test-sli-uuid-indexer-67890 + ``` + +### Issue 4: Log Spam from Connection Errors + +**Symptoms:** +- Many "Failed to connect" messages in logs + +**Why This Happens:** +- Error log throttling limits to once per 5 minutes +- If seeing spam, check `errorLogInterval` setting + +**Solution:** +- Normal behavior if metrics agent is down +- Errors are throttled automatically +- Application continues working + +### Issue 5: Metrics Delayed + +**Why:** +- Metrics sent synchronously (1 second timeout) +- TCP connection created per metric +- Not batched + +**Impact:** +- Minimal (<1ms typical) +- Max 1 second per metric (on timeout) +- Does not block processing + +--- + +## Performance Considerations + +### Overhead + +**Per Metric:** +- TCP connection: ~1-2ms +- Metric formatting: <0.1ms +- Network I/O: 1-5ms (local network) + +**Total:** ~5-10ms per metric (negligible) + +### Scalability + +**Current Design:** +- No connection pooling +- New TCP connection per metric +- Fire-and-forget (no retries) + +**Max Throughput:** +- ~100-200 metrics/second (single instance) +- Sufficient for indexer workload (<10 metrics/second typical) + +**If Scaling Needed:** +- Add connection pooling +- Batch multiple metrics +- Use UDP instead of TCP + +### Resource Usage + +**Memory:** +- ~1KB per MetricsPublisher instance (singleton) +- Negligible per-metric allocation + +**CPU:** +- <0.1% for typical workload +- Mostly network I/O (non-blocking) + +**Network:** +- ~200-300 bytes per metric +- Insignificant bandwidth + +--- + +## SQS Message Handling Strategy + +### Improved Retry Logic + +The indexer now implements a robust SQS message handling strategy that prevents data loss from transient failures: + +#### Message Deletion Policy + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Message Handling Strategy β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ Scenario β”‚ Action β”‚ Reasoning β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ βœ… Success β”‚ Delete β”‚ Processing complete β”‚ +β”‚ πŸ”₯ S3 fetch failure β”‚ Retry (no del) β”‚ May be transient issue β”‚ +β”‚ πŸ”₯ Parse/DB failure β”‚ Retry (no del) β”‚ May be transient issue β”‚ +β”‚ πŸ’€ Malformed JSON β”‚ Delete β”‚ Won't fix with retry β”‚ +β”‚ 🏁 SQS delete fail β”‚ Continue β”‚ Already processed β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +#### Key Benefits + +1. **Data Durability**: Transient failures no longer cause permanent data loss +2. **Automatic Recovery**: SQS visibility timeout handles retries automatically +3. **Cost Efficiency**: No manual retry logic needed +4. **Poison Message Protection**: Malformed messages still get removed to prevent infinite loops +5. **Operational Resilience**: System recovers from temporary S3/ClickHouse outages + +#### Implementation Details + +**Infrastructure Layer (queue.go):** +- **SQS Queue URL Resolution** - Tracks connection establishment failures +- **SQS Message Polling** - Monitors network/service availability issues +- **Message Format Validation** - Handles malformed JSON (deletes only these) + +**Processing Layer (worker.go):** +- **S3 File Retrieval** - No deletion on failure (allows retry for network issues) +- **Data Processing** - No deletion on failure (allows retry for ClickHouse downtime) +- **Cleanup Operations** - Tracks delete failures but continues (data already processed) +- **End-to-End Success** - Only successful path triggers message deletion + +### SLI Metric Coverage + +The complete event processing pipeline now tracks: + +``` +SQS Queue URL Resolution β†’ SQS Message Polling β†’ Message Parsing β†’ +S3 File Download β†’ ClickHouse Processing β†’ SQS Message Cleanup β†’ Success +``` + +**Error Categories:** +- `sqs_infrastructure` - SQS connectivity and polling issues +- `{service_name}` - Business logic processing failures +- Response types properly classify server vs client errors + +--- + +## Best Practices + +### DO βœ… + +1. **Use Response Types Correctly** + - `success`: Operation completed successfully + - `failure`: Server error (counts against SLO) + - `ignored_failure`: Client error (doesn't count) + +2. **Add Context with Tags** + ```go + map[string]string{ + "service": serviceName, + "filename": filename, + "error": errorType, + } + ``` + +3. **Track Only Production Events** + ```go + if useSQS { // Only track real events, not test data + GetMetricsPublisher().SendSLIMetric(...) + } + ``` + +4. **Let Publisher Handle nil/disabled** + ```go + // No need to check if enabled + GetMetricsPublisher().SendSLIMetric(...) + ``` + +5. **Follow SQS Retry Strategy** + ```go + // βœ… Only delete on success or permanent client errors + if err != nil { + GetMetricsPublisher().SendSLIMetric(ResponseTypeFailure, ...) + // Do NOT delete - let SQS retry for transient failures + log.Warnf("Processing failed, message will retry via SQS") + return err + } + ``` + +6. **Use Infrastructure Service Tags** + ```go + // For SQS polling/connection issues + map[string]string{ + "service": "sqs_infrastructure", + "error": "sqs_receive_failed", + } + ``` + +### DON'T ❌ + +1. **Don't Create Multiple Instances** + ```go + // ❌ Wrong + pub := NewMetricsPublisher(...) + pub.SendSLIMetric(...) + + // βœ… Correct + GetMetricsPublisher().SendSLIMetric(...) + ``` + +2. **Don't Block on Metrics** + ```go + // ❌ Wrong + if !GetMetricsPublisher().SendSLIMetric(...) { + return errors.New("metrics failed") + } + + // βœ… Correct + GetMetricsPublisher().SendSLIMetric(...) // Fire and forget + ``` + +3. **Don't Track Client Errors as Failures** + ```go + // ❌ Wrong + if err == ErrInvalidInput { + GetMetricsPublisher().SendSLIMetric(ResponseTypeFailure, ...) + } + + // βœ… Correct + if err == ErrInvalidInput { + GetMetricsPublisher().SendSLIMetric(ResponseTypeIgnoredFailure, ...) + } + ``` + +4. **Don't Retry Metric Sending** + ```go + // ❌ Wrong + for i := 0; i < 3; i++ { + if GetMetricsPublisher().SendSLIMetric(...) { + break + } + } + + // βœ… Correct + GetMetricsPublisher().SendSLIMetric(...) // Single attempt + ``` + +5. **Don't Delete Messages on Transient Failures** + ```go + // ❌ Wrong + if err := fetchFromS3(); err != nil { + deleteMessage(sess, queueURL, messageHandle) // Data loss! + } + + // βœ… Correct + if err := fetchFromS3(); err != nil { + log.Warnf("S3 fetch failed, message will retry via SQS") + return err // Let SQS handle retry + } + ``` + +6. **Don't Mix Service Tags Inconsistently** + ```go + // ❌ Wrong + map[string]string{ + "service": "sqs", // Inconsistent naming + "service": "infrastructure", // Inconsistent naming + } + + // βœ… Correct + map[string]string{ + "service": "sqs_infrastructure", // Consistent naming + } + ``` + +--- + +## Metrics Dashboard Queries + +### SLO Calculation (Success Rate) + +```sql +-- Success rate over 5 minutes +sum(rate(error-budget.counters.prod-sli-uuid{response_type="success"}[5m])) +/ +( + sum(rate(error-budget.counters.prod-sli-uuid{response_type="success"}[5m])) + + + sum(rate(error-budget.counters.prod-sli-uuid{response_type="failure"}[5m])) +) * 100 +``` + +### Error Budget Consumption + +```sql +-- Errors per hour +sum(increase(error-budget.counters.prod-sli-uuid{response_type="failure"}[1h])) +``` + +### Top Failure Reasons + +```sql +-- Group by error type (includes new SQS infrastructure errors) +sum by (error) ( + increase(error-budget.counters.prod-sli-uuid{response_type="failure"}[1h]) +) + +-- Expected error types: +-- s3_fetch_failed: S3 network/access issues +-- parse_or_write_failed: ClickHouse/parsing issues +-- sqs_delete_failed: SQS cleanup failures +-- sqs_queue_url_failed: SQS connection setup failures +-- sqs_receive_failed: SQS polling failures +``` + +### SQS Infrastructure Health + +```sql +-- SQS infrastructure error rate +sum(rate(error-budget.counters.prod-sli-uuid{service="sqs_infrastructure",response_type="failure"}[5m])) + +-- Client errors (malformed messages) - don't count against SLO +sum(rate(error-budget.counters.prod-sli-uuid{service="sqs_infrastructure",response_type="ignored_failure"}[5m])) +``` + +--- + +## Migration Guide + +### Adding Metrics to Existing Service + +**Step 1: Add Configuration** + +```go +// args.go +type CLIArgs struct { + // ... existing fields ... + MetricsEnabled bool + MetricsAgentURL string + MetricsServiceName string + MetricsSLIUUID string +} +``` + +**Step 2: Initialize in main()** + +```go +// main.go +func main() { + // ... existing code ... + + metricsPublisher := NewMetricsPublisher( + args.MetricsAgentURL, + args.MetricsServiceName, + args.MetricsSLIUUID, + args.MetricsEnabled, + ) + + defer func() { + if metricsPublisher != nil { + metricsPublisher.FlushAndClose() + } + }() + + // ... rest of application ... +} +``` + +**Step 3: Add Tracking to Workers** + +```go +// worker.go +func processEvent(event Event) error { + result, err := doWork(event) + + if err != nil { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{"error": err.Error()}, + ) + return err + } + + GetMetricsPublisher().SendSLIMetric( + ResponseTypeSuccess, + "event_processing", + nil, + ) + return nil +} +``` + +--- + +## Changelog + +### Version 1.1 (2025-11-05) +- **Major Improvement**: Enhanced SQS message handling strategy +- **Added**: SQS infrastructure monitoring (queue URL resolution, message polling) +- **Added**: Proper retry logic - no message deletion on transient failures +- **Added**: Malformed message handling with deletion for poison message protection +- **Added**: Infrastructure service tagging (`sqs_infrastructure`) +- **Improved**: Data durability through SQS visibility timeout retry mechanism +- **Added**: Comprehensive error type tracking across the entire pipeline + +### Version 1.0 (2025-10-30) +- Initial implementation +- Singleton pattern with thread safety +- Graphite plaintext protocol +- SLI metric tracking +- Error log throttling +- Graceful degradation +- Nil-safe method calls + +--- + +## References + +- **Graphite Protocol**: http://graphite.readthedocs.io/en/latest/feeding-carbon.html +- **SLO/SLI Best Practices**: https://sre.google/sre-book/service-level-objectives/ +- **Backend Implementation**: [PR #34](https://github.com/pinterest/gprofiler-performance-studio/pull/34) + +--- + +## Support + +For questions or issues: +1. Check troubleshooting section above +2. Review integration tests in `LOCAL_TESTING_GUIDE.md` +3. Check indexer logs for metric output +4. Verify metrics agent is receiving data + +--- + +**Last Updated:** 2025-10-30 +**Version:** 1.0 +**Author:** Development Team + diff --git a/src/gprofiler-dev/gprofiler_dev/config.py b/src/gprofiler-dev/gprofiler_dev/config.py index 459ae5e1..e84eb8f7 100644 --- a/src/gprofiler-dev/gprofiler_dev/config.py +++ b/src/gprofiler-dev/gprofiler_dev/config.py @@ -38,3 +38,6 @@ BUCKET_NAME = os.getenv("BUCKET_NAME", "gprofiler") BASE_DIRECTORY = "products" +# Optional: Custom S3 endpoint for local testing (e.g., LocalStack) or S3-compatible services +# In production, leave unset to use default AWS S3 endpoints +S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL") diff --git a/src/gprofiler-dev/gprofiler_dev/s3_profile_dal.py b/src/gprofiler-dev/gprofiler_dev/s3_profile_dal.py index c994af0e..5ccb6d33 100644 --- a/src/gprofiler-dev/gprofiler_dev/s3_profile_dal.py +++ b/src/gprofiler-dev/gprofiler_dev/s3_profile_dal.py @@ -47,8 +47,10 @@ def __init__( aws_secret_access_key=config.AWS_SECRET_ACCESS_KEY, aws_session_token=config.AWS_SESSION_TOKEN, ) - self._s3_client = session.client("s3", config=Config(max_pool_connections=50)) - self._s3_resource = session.resource("s3") + # endpoint_url allows connecting to LocalStack or S3-compatible services for testing + # When None (default), uses standard AWS S3 endpoints + self._s3_client = session.client("s3", config=Config(max_pool_connections=50), endpoint_url=config.S3_ENDPOINT_URL) + self._s3_resource = session.resource("s3", endpoint_url=config.S3_ENDPOINT_URL) @staticmethod def join_path(*parts: str) -> str: diff --git a/src/gprofiler_indexer/args.go b/src/gprofiler_indexer/args.go index 0ed18bae..14f7499d 100644 --- a/src/gprofiler_indexer/args.go +++ b/src/gprofiler_indexer/args.go @@ -36,6 +36,11 @@ type CLIArgs struct { FrameReplaceFileName string AWSEndpoint string AWSRegion string + // Metrics Publisher Configuration + MetricsEnabled bool + MetricsAgentURL string + MetricsServiceName string + MetricsSLIUUID string } func NewCliArgs() *CLIArgs { @@ -50,6 +55,11 @@ func NewCliArgs() *CLIArgs { ClickHouseStacksBatchSize: 10000, ClickHouseMetricsBatchSize: 100, FrameReplaceFileName: ConfPrefix + "replace.yaml", + // Metrics defaults + MetricsEnabled: false, + MetricsAgentURL: "tcp://localhost:18126", + MetricsServiceName: "gprofiler-indexer", + MetricsSLIUUID: "", } } @@ -57,7 +67,7 @@ func (ca *CLIArgs) ParseArgs() { flag.StringVar(&ca.SQSQueue, "sqs-queue", LookupEnvOrString("SQS_QUEUE_URL", ca.SQSQueue), "SQS Queue name to listen") flag.StringVar(&ca.S3Bucket, "s3-bucket", LookupEnvOrString("S3_BUCKET", ca.S3Bucket), "S3 bucket name") - flag.StringVar(&ca.AWSEndpoint, "aws-endpoint", LookupEnvOrString("S3_ENDPOINT", ca.AWSEndpoint), "AWS Endpoint URL") + flag.StringVar(&ca.AWSEndpoint, "aws-endpoint", LookupEnvOrString("AWS_ENDPOINT_URL", ca.AWSEndpoint), "AWS Endpoint URL") flag.StringVar(&ca.AWSRegion, "aws-region", LookupEnvOrString("AWS_REGION", ca.AWSRegion), "AWS Region") flag.StringVar(&ca.ClickHouseAddr, "clickhouse-addr", LookupEnvOrString("CLICKHOUSE_ADDR", ca.ClickHouseAddr), "ClickHouse address like 127.0.0.1:9000") @@ -83,6 +93,15 @@ func (ca *CLIArgs) ParseArgs() { flag.StringVar(&ca.FrameReplaceFileName, "replace-file", LookupEnvOrString("REPLACE_FILE", ca.FrameReplaceFileName), "replace.yaml") + // Metrics Publisher Configuration + flag.BoolVar(&ca.MetricsEnabled, "metrics-enabled", LookupEnvOrBool("METRICS_ENABLED", ca.MetricsEnabled), + "Enable metrics publishing (default false)") + flag.StringVar(&ca.MetricsAgentURL, "metrics-agent-url", LookupEnvOrString("METRICS_AGENT_URL", ca.MetricsAgentURL), + "Metrics agent URL (default tcp://localhost:18126)") + flag.StringVar(&ca.MetricsServiceName, "metrics-service-name", LookupEnvOrString("METRICS_SERVICE_NAME", ca.MetricsServiceName), + "Service name for metrics (default gprofiler-indexer)") + flag.StringVar(&ca.MetricsSLIUUID, "metrics-sli-uuid", LookupEnvOrString("METRICS_SLI_UUID", ca.MetricsSLIUUID), + "SLI metric UUID") flag.Parse() if ca.SQSQueue == "" && ca.InputFolder == "" { diff --git a/src/gprofiler_indexer/main.go b/src/gprofiler_indexer/main.go index 0c5dae2e..68c68876 100644 --- a/src/gprofiler_indexer/main.go +++ b/src/gprofiler_indexer/main.go @@ -46,6 +46,15 @@ func main() { args.ParseArgs() logger.Infof("Starting %s", AppName) + + // Initialize metrics publisher + metricsPublisher := NewMetricsPublisher( + args.MetricsAgentURL, + args.MetricsServiceName, + args.MetricsSLIUUID, + args.MetricsEnabled, + ) + tasks := make(chan SQSMessage, args.Concurrency) channels := RecordChannels{ StacksRecords: make(chan StackRecord, args.ClickHouseStacksBatchSize), @@ -107,5 +116,11 @@ func main() { }() buffWriterWaitGroup.Wait() + + // Cleanup metrics publisher + if metricsPublisher != nil { + metricsPublisher.FlushAndClose() + } + logger.Info("Graceful shutdown") } diff --git a/src/gprofiler_indexer/metrics_publisher.go b/src/gprofiler_indexer/metrics_publisher.go new file mode 100644 index 00000000..bff7d9c8 --- /dev/null +++ b/src/gprofiler_indexer/metrics_publisher.go @@ -0,0 +1,234 @@ +// +// Copyright (C) 2023 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package main + +import ( + "fmt" + "net" + "strings" + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +// Response type constants for SLI metrics +const ( + ResponseTypeSuccess = "success" + ResponseTypeFailure = "failure" + ResponseTypeIgnoredFailure = "ignored_failure" +) + +// MetricsPublisher handles sending metrics to metrics agent via TCP +type MetricsPublisher struct { + host string + port string + serviceName string + sliMetricUUID string + enabled bool + connectionFailed bool + lastErrorLogTime int64 + errorLogInterval int64 + mutex sync.Mutex +} + +var ( + metricsInstance *MetricsPublisher + metricsOnce sync.Once +) + +// NewMetricsPublisher creates or returns the singleton MetricsPublisher instance +func NewMetricsPublisher(serverURL, serviceName, sliUUID string, enabled bool) *MetricsPublisher { + metricsOnce.Do(func() { + instance := &MetricsPublisher{ + serviceName: serviceName, + sliMetricUUID: sliUUID, + enabled: enabled, + errorLogInterval: 300, // Log errors at most once every 5 minutes + } + + // Parse server URL (tcp://host:port) + if strings.HasPrefix(serverURL, "tcp://") { + urlParts := strings.Split(serverURL[6:], ":") + instance.host = urlParts[0] + if len(urlParts) > 1 { + instance.port = urlParts[1] + } else { + instance.port = "18126" + } + } else { + if enabled { + log.Fatalf("Unsupported server URL format: %s. Expected tcp://host:port", serverURL) + } + instance.host = "localhost" + instance.port = "18126" + } + + if enabled { + log.Infof("MetricsPublisher initialized: service=%s, server=%s:%s, sli_enabled=%t", + serviceName, instance.host, instance.port, sliUUID != "") + } else { + log.Info("MetricsPublisher disabled") + } + + metricsInstance = instance + }) + + return metricsInstance +} + +// GetInstance returns the singleton MetricsPublisher instance +// Returns nil if not initialized +func GetMetricsPublisher() *MetricsPublisher { + return metricsInstance +} + +// SendSLIMetric sends an SLI metric for tracking HTTP success rate +// responseType: success, failure, or ignored_failure +// methodName: The method/operation being tracked (e.g., "event_processing") +// extraTags: Additional tags as key-value pairs +func (m *MetricsPublisher) SendSLIMetric(responseType, methodName string, extraTags map[string]string) bool { + if m == nil || !m.enabled || m.sliMetricUUID == "" { + return false + } + + // Build metric name using configured SLI UUID + metricName := fmt.Sprintf("error-budget.counters.%s", m.sliMetricUUID) + + // Get current epoch timestamp + timestamp := time.Now().Unix() + + // Build tag string with required SLI tags (Graphite plaintext protocol format) + tags := []string{ + fmt.Sprintf("service=%s", m.serviceName), + fmt.Sprintf("response_type=%s", responseType), + fmt.Sprintf("method_name=%s", methodName), + } + + if extraTags != nil { + for key, value := range extraTags { + tags = append(tags, fmt.Sprintf("%s=%s", key, value)) + } + } + + tagString := strings.Join(tags, " ") + + // Format: put metric_name timestamp value tag1=value1 tag2=value2 ... + metricLine := fmt.Sprintf("put %s %d 1 %s", metricName, timestamp, tagString) + + log.Infof("πŸ“Š Sending SLI metric: %s", metricLine) + + return m.sendMetric(metricLine) +} + +// SendErrorMetric sends an operational error metric +func (m *MetricsPublisher) SendErrorMetric(metricName string, extraTags map[string]string) bool { + if m == nil || !m.enabled { + return false + } + + // Get current epoch timestamp + timestamp := time.Now().Unix() + + // Build tag string + tags := []string{ + fmt.Sprintf("service=%s", m.serviceName), + } + + if extraTags != nil { + for key, value := range extraTags { + tags = append(tags, fmt.Sprintf("%s=%s", key, value)) + } + } + + tagString := strings.Join(tags, " ") + + // Format: put metric_name timestamp value tag1=value1 tag2=value2 ... + metricLine := fmt.Sprintf("put %s %d 1 %s", metricName, timestamp, tagString) + + log.Debugf("πŸ“Š Sending error metric: %s", metricLine) + + return m.sendMetric(metricLine) +} + +// sendMetric sends a metric line via TCP socket +func (m *MetricsPublisher) sendMetric(metricLine string) bool { + if m == nil || !m.enabled { + return false + } + + // Check if we should throttle error logging + m.mutex.Lock() + now := time.Now().Unix() + shouldLogError := now-m.lastErrorLogTime >= m.errorLogInterval + m.mutex.Unlock() + + // Ensure metric line ends with newline + if !strings.HasSuffix(metricLine, "\n") { + metricLine = metricLine + "\n" + } + + // Create TCP connection with timeout + address := net.JoinHostPort(m.host, m.port) + conn, err := net.DialTimeout("tcp", address, 1*time.Second) + if err != nil { + if shouldLogError { + log.Warnf("Failed to connect to metrics agent at %s: %v", address, err) + m.mutex.Lock() + m.lastErrorLogTime = now + m.connectionFailed = true + m.mutex.Unlock() + } + return false + } + defer conn.Close() + + // Set write timeout + conn.SetWriteDeadline(time.Now().Add(1 * time.Second)) + + // Send metric + _, err = conn.Write([]byte(metricLine)) + if err != nil { + if shouldLogError { + log.Warnf("Failed to send metric: %v", err) + m.mutex.Lock() + m.lastErrorLogTime = now + m.mutex.Unlock() + } + return false + } + + // Reset connection failed flag on success + m.mutex.Lock() + if m.connectionFailed { + log.Info("Successfully reconnected to metrics agent") + m.connectionFailed = false + } + m.mutex.Unlock() + + return true +} + +// FlushAndClose flushes any pending metrics and closes the publisher +func (m *MetricsPublisher) FlushAndClose() { + m.mutex.Lock() + defer m.mutex.Unlock() + + log.Info("MetricsPublisher closed") + m.enabled = false +} + diff --git a/src/gprofiler_indexer/queue.go b/src/gprofiler_indexer/queue.go index f0b054c1..83c2aa1f 100644 --- a/src/gprofiler_indexer/queue.go +++ b/src/gprofiler_indexer/queue.go @@ -65,6 +65,17 @@ func ListenSqs(ctx context.Context, args *CLIArgs, ch chan<- SQSMessage, wg *syn if err != nil { logger.Errorf("Got an error getting the queue URL: %v", err) + + // SLI Metric: SQS queue URL resolution failure (infrastructure error - counts against SLO) + // This tracks connectivity and configuration issues with SQS + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": "N/A", + "error": "sqs_queue_url_failed", + }, + ) return } @@ -81,6 +92,17 @@ func ListenSqs(ctx context.Context, args *CLIArgs, ch chan<- SQSMessage, wg *syn }) if recvErr != nil { logger.Error(recvErr) + + // SLI Metric: SQS message receive failure (infrastructure error - counts against SLO) + // This tracks SQS polling failures and network issues + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": "N/A", + "error": "sqs_receive_failed", + }, + ) continue } @@ -88,7 +110,42 @@ func ListenSqs(ctx context.Context, args *CLIArgs, ch chan<- SQSMessage, wg *syn var sqsMessage SQSMessage parseErr := json.Unmarshal([]byte(*message.Body), &sqsMessage) if parseErr != nil { - logger.Errorf("Error while parsing %v", parseErr) + logger.Errorf("Error while parsing SQS message body: %v", parseErr) + + // SLI Metric: SQS message parse failure (client error - malformed JSON) + // This tracks malformed JSON messages in the SQS queue + GetMetricsPublisher().SendSLIMetric( + ResponseTypeIgnoredFailure, // Client error - doesn't count against SLO + "event_processing", + map[string]string{ + "service": "N/A", + "error": "sqs_message_parse_failed", + }, + ) + + // Delete malformed messages to prevent infinite retry loop + // This is a permanent client error that won't be fixed by retrying + svc := sqs.New(sess) + _, deleteErr := svc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: urlResult.QueueUrl, + ReceiptHandle: message.ReceiptHandle, + }) + if deleteErr != nil { + logger.Errorf("Failed to delete malformed message: %v", deleteErr) + + // SLI Metric: SQS message deletion failure (infrastructure error - counts against SLO) + // This tracks failures to delete malformed messages from the queue + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": "N/A", + "error": "sqs_delete_malformed_message_failed", + }, + ) + } else { + logger.Warnf("Deleted malformed SQS message to prevent reprocessing") + } continue } sqsMessage.QueueURL = *urlResult.QueueUrl diff --git a/src/gprofiler_indexer/worker.go b/src/gprofiler_indexer/worker.go index 90c10573..fbdf069d 100644 --- a/src/gprofiler_indexer/worker.go +++ b/src/gprofiler_indexer/worker.go @@ -41,24 +41,43 @@ func Worker(workerIdx int, args *CLIArgs, tasks <-chan SQSMessage, pw *ProfilesW } if args.AWSEndpoint != "" { sessionOptions.Config = aws.Config{ - Region: aws.String(args.AWSRegion), - Endpoint: aws.String(args.AWSEndpoint), + Region: aws.String(args.AWSRegion), + Endpoint: aws.String(args.AWSEndpoint), + S3ForcePathStyle: aws.Bool(true), } } sess := session.Must(session.NewSessionWithOptions(sessionOptions)) for task := range tasks { useSQS := task.Service != "" - log.Debugf("got new file %s from %d", task.Filename, task.ServiceId) + serviceName := task.Service + if serviceName == "" { + serviceName = "local-file" + } + + log.Debugf("got new file %s from service %s (ID: %d)", task.Filename, serviceName, task.ServiceId) + if useSQS { fullPath := fmt.Sprintf("products/%s/stacks/%s", task.Service, task.Filename) buf, err = GetFileFromS3(sess, args.S3Bucket, fullPath) if err != nil { log.Errorf("Error while fetching file from S3: %v", err) - errDelete := deleteMessage(sess, task.QueueURL, task.MessageHandle) - if errDelete != nil { - log.Errorf("Unable to delete message from %s, err %v", task.QueueURL, errDelete) - } + + // SLI Metric: S3 fetch failure (server error - counts against SLO) + // Only tracks SQS events; SendSLIMetric handles nil/enabled checks internally + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": serviceName, + "error": "s3_fetch_failed", + "filename": task.Filename, + }, + ) + + // Do NOT delete message - let SQS retry for transient S3 failures + // Message will retry automatically via SQS visibility timeout + log.Warnf("S3 fetch failed for %s, message will retry via SQS", task.Filename) continue } temp = strings.Split(task.Filename, "_")[0] @@ -69,6 +88,7 @@ func Worker(workerIdx int, args *CLIArgs, tasks <-chan SQSMessage, pw *ProfilesW temp = strings.Join(tokens[:3], ":") } } + layout := ISODateTimeFormat timestamp, tsErr := time.Parse(layout, temp) log.Debugf("parsed timestamp is: %v", timestamp) @@ -76,16 +96,65 @@ func Worker(workerIdx int, args *CLIArgs, tasks <-chan SQSMessage, pw *ProfilesW log.Debugf("Unable to fetch timestamp from filename %s, fallback to the current time", temp) timestamp = time.Now().UTC() } + + // Parse stack frame file and write to ClickHouse err := pw.ParseStackFrameFile(sess, task, args.S3Bucket, timestamp, buf) if err != nil { log.Errorf("Error while parsing stack frame file: %v", err) + + // SLI Metric: Parse event failure or write profile to column DB failure (server error - counts against SLO) + // Only tracks SQS events; SendSLIMetric handles nil/enabled checks internally + if useSQS { + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": serviceName, + "error": "parse_or_write_failed", + "filename": task.Filename, + }, + ) + } + + // Do NOT delete message - let SQS retry for transient ClickHouse/parsing failures + // Message will retry automatically via SQS visibility timeout + if useSQS { + log.Warnf("Parse/write failed for %s, message will retry via SQS", task.Filename) + } + continue } + // Delete message from SQS after successful processing if useSQS { errDelete := deleteMessage(sess, task.QueueURL, task.MessageHandle) if errDelete != nil { log.Errorf("Unable to delete message from %s, err %v", task.QueueURL, errDelete) + + // SLI Metric: SQS delete failure (server error - counts against SLO) + // The event was processed but we couldn't clean up + // SendSLIMetric handles nil/enabled checks internally + GetMetricsPublisher().SendSLIMetric( + ResponseTypeFailure, + "event_processing", + map[string]string{ + "service": serviceName, + "error": "sqs_delete_failed", + "filename": task.Filename, + }, + ) + continue } + + // SLI Metric: Success! Event processed completely + // SendSLIMetric handles nil/enabled checks internally + GetMetricsPublisher().SendSLIMetric( + ResponseTypeSuccess, + "event_processing", + map[string]string{ + "service": serviceName, + "filename": task.Filename, + }, + ) } } log.Debugf("Worker %d finished", workerIdx)