Skip to content

Commit c9041de

Browse files
committed
Improve indexer SLI metrics publishing strategy and SQS event handling
1 parent 94f2452 commit c9041de

File tree

3 files changed

+307
-26
lines changed

3 files changed

+307
-26
lines changed

METRICS_PUBLISHER_INDEXER_DOCUMENTATION.md

Lines changed: 242 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ GetMetricsPublisher().SendSLIMetric(
334334
#### 4. Error Budget Tracking
335335

336336
```go
337-
// S3 fetch failed (counts against SLO)
337+
// S3 fetch failed (counts against SLO) - NO MESSAGE DELETION
338338
if err != nil {
339339
GetMetricsPublisher().SendSLIMetric(
340340
ResponseTypeFailure, // ← Counts against error budget
@@ -345,19 +345,37 @@ if err != nil {
345345
"filename": task.Filename,
346346
},
347347
)
348+
// Do NOT delete message - let SQS retry for transient failures
349+
log.Warnf("S3 fetch failed for %s, message will retry via SQS", task.Filename)
350+
return err
348351
}
349352

350-
// SQS delete failed (still counts as failure)
351-
if errDelete != nil {
353+
// SQS infrastructure failures (counts against SLO)
354+
if recvErr != nil {
352355
GetMetricsPublisher().SendSLIMetric(
353-
ResponseTypeFailure, //Also counts against SLO
356+
ResponseTypeFailure, //Infrastructure failure
354357
"event_processing",
355358
map[string]string{
356-
"service": serviceName,
357-
"error": "sqs_delete_failed",
358-
"filename": task.Filename,
359+
"service": "sqs_infrastructure",
360+
"error": "sqs_receive_failed",
361+
},
362+
)
363+
continue // Keep polling
364+
}
365+
366+
// Malformed messages (client error - doesn't count against SLO)
367+
if parseErr != nil {
368+
GetMetricsPublisher().SendSLIMetric(
369+
ResponseTypeIgnoredFailure, // ← Client error, doesn't impact SLO
370+
"event_processing",
371+
map[string]string{
372+
"service": "sqs_infrastructure",
373+
"error": "sqs_message_parse_failed",
359374
},
360375
)
376+
// Delete malformed messages to prevent infinite retry
377+
deleteMessage(sess, queueURL, messageHandle)
378+
continue
361379
}
362380
```
363381

@@ -525,7 +543,7 @@ func ProcessEvent(event SQSMessage) error {
525543
}
526544
```
527545

528-
### Example 2: Multi-Stage Processing with Detailed Tracking
546+
### Example 2: Multi-Stage Processing with Proper SQS Retry Strategy
529547

530548
```go
531549
func ProcessProfile(task SQSMessage) error {
@@ -541,25 +559,33 @@ func ProcessProfile(task SQSMessage) error {
541559
"filename": task.Filename,
542560
},
543561
)
562+
563+
// Do NOT delete message - let SQS retry for transient S3 failures
564+
// Message will retry automatically via SQS visibility timeout
565+
log.Warnf("S3 fetch failed for %s, message will retry via SQS", task.Filename)
544566
return err
545567
}
546568

547-
// Stage 2: Parse and insert
569+
// Stage 2: Parse and insert into ClickHouse
548570
err = ParseAndInsert(buf)
549571
if err != nil {
550572
GetMetricsPublisher().SendSLIMetric(
551573
ResponseTypeFailure,
552574
"event_processing",
553575
map[string]string{
554576
"service": task.Service,
555-
"error": "parse_or_insert_failed",
577+
"error": "parse_or_write_failed",
556578
"filename": task.Filename,
557579
},
558580
)
581+
582+
// Do NOT delete message - let SQS retry for transient ClickHouse/parsing failures
583+
// Message will retry automatically via SQS visibility timeout
584+
log.Warnf("Parse/write failed for %s, message will retry via SQS", task.Filename)
559585
return err
560586
}
561587

562-
// Stage 3: Cleanup
588+
// Stage 3: Cleanup (only on success)
563589
err = deleteMessage(sess, task.QueueURL, task.MessageHandle)
564590
if err != nil {
565591
GetMetricsPublisher().SendSLIMetric(
@@ -571,7 +597,9 @@ func ProcessProfile(task SQSMessage) error {
571597
"filename": task.Filename,
572598
},
573599
)
574-
return err
600+
// Continue anyway - message was processed successfully
601+
// SQS will eventually make message visible again, but that's better than data loss
602+
log.Errorf("Failed to delete processed message: %v", err)
575603
}
576604

577605
// Success!
@@ -587,7 +615,74 @@ func ProcessProfile(task SQSMessage) error {
587615
}
588616
```
589617

590-
### Example 3: Conditional Metrics (Only for Production Traffic)
618+
### Example 3: SQS Infrastructure Monitoring
619+
620+
```go
621+
// In queue.go - SQS polling and connection monitoring
622+
func ListenSqs(ctx context.Context, args *CLIArgs, ch chan<- SQSMessage, wg *sync.WaitGroup) {
623+
// ... SQS setup ...
624+
625+
urlResult, err := getQueueURL(sess, args.SQSQueue)
626+
if err != nil {
627+
logger.Errorf("Got an error getting the queue URL: %v", err)
628+
629+
// SLI Metric: SQS queue URL resolution failure (infrastructure error)
630+
GetMetricsPublisher().SendSLIMetric(
631+
ResponseTypeFailure,
632+
"event_processing",
633+
map[string]string{
634+
"service": "sqs_infrastructure",
635+
"error": "sqs_queue_url_failed",
636+
},
637+
)
638+
return
639+
}
640+
641+
for {
642+
output, recvErr := svc.ReceiveMessage(&sqs.ReceiveMessageInput{...})
643+
if recvErr != nil {
644+
logger.Error(recvErr)
645+
646+
// SLI Metric: SQS message receive failure (infrastructure error)
647+
GetMetricsPublisher().SendSLIMetric(
648+
ResponseTypeFailure,
649+
"event_processing",
650+
map[string]string{
651+
"service": "sqs_infrastructure",
652+
"error": "sqs_receive_failed",
653+
},
654+
)
655+
continue
656+
}
657+
658+
for _, message := range output.Messages {
659+
var sqsMessage SQSMessage
660+
parseErr := json.Unmarshal([]byte(*message.Body), &sqsMessage)
661+
if parseErr != nil {
662+
// SLI Metric: Malformed message (client error - doesn't count against SLO)
663+
GetMetricsPublisher().SendSLIMetric(
664+
ResponseTypeIgnoredFailure,
665+
"event_processing",
666+
map[string]string{
667+
"service": "sqs_infrastructure",
668+
"error": "sqs_message_parse_failed",
669+
},
670+
)
671+
672+
// Delete malformed messages to prevent infinite retry loop
673+
// This is a permanent client error that won't be fixed by retrying
674+
deleteMessage(sess, *urlResult.QueueUrl, *message.ReceiptHandle)
675+
continue
676+
}
677+
678+
// Forward valid message to workers
679+
ch <- sqsMessage
680+
}
681+
}
682+
}
683+
```
684+
685+
### Example 4: Conditional Metrics (Only for Production Traffic)
591686

592687
```go
593688
func Worker(tasks <-chan SQSMessage) {
@@ -815,6 +910,65 @@ extra_hosts:
815910

816911
---
817912

913+
## SQS Message Handling Strategy
914+
915+
### Improved Retry Logic
916+
917+
The indexer now implements a robust SQS message handling strategy that prevents data loss from transient failures:
918+
919+
#### Message Deletion Policy
920+
921+
```
922+
┌─────────────────────────────────────────────────────────────────┐
923+
│ Message Handling Strategy │
924+
├─────────────────────┬─────────────────┬─────────────────────────┤
925+
│ Scenario │ Action │ Reasoning │
926+
├─────────────────────┼─────────────────┼─────────────────────────┤
927+
│ ✅ Success │ Delete │ Processing complete │
928+
│ 🔥 S3 fetch failure │ Retry (no del) │ May be transient issue │
929+
│ 🔥 Parse/DB failure │ Retry (no del) │ May be transient issue │
930+
│ 💀 Malformed JSON │ Delete │ Won't fix with retry │
931+
│ 🏁 SQS delete fail │ Continue │ Already processed │
932+
└─────────────────────┴─────────────────┴─────────────────────────┘
933+
```
934+
935+
#### Key Benefits
936+
937+
1. **Data Durability**: Transient failures no longer cause permanent data loss
938+
2. **Automatic Recovery**: SQS visibility timeout handles retries automatically
939+
3. **Cost Efficiency**: No manual retry logic needed
940+
4. **Poison Message Protection**: Malformed messages still get removed to prevent infinite loops
941+
5. **Operational Resilience**: System recovers from temporary S3/ClickHouse outages
942+
943+
#### Implementation Details
944+
945+
**Infrastructure Layer (queue.go):**
946+
- **SQS Queue URL Resolution** - Tracks connection establishment failures
947+
- **SQS Message Polling** - Monitors network/service availability issues
948+
- **Message Format Validation** - Handles malformed JSON (deletes only these)
949+
950+
**Processing Layer (worker.go):**
951+
- **S3 File Retrieval** - No deletion on failure (allows retry for network issues)
952+
- **Data Processing** - No deletion on failure (allows retry for ClickHouse downtime)
953+
- **Cleanup Operations** - Tracks delete failures but continues (data already processed)
954+
- **End-to-End Success** - Only successful path triggers message deletion
955+
956+
### SLI Metric Coverage
957+
958+
The complete event processing pipeline now tracks:
959+
960+
```
961+
SQS Queue URL Resolution → SQS Message Polling → Message Parsing →
962+
S3 File Download → ClickHouse Processing → SQS Message Cleanup → Success
963+
```
964+
965+
**Error Categories:**
966+
- `sqs_infrastructure` - SQS connectivity and polling issues
967+
- `{service_name}` - Business logic processing failures
968+
- Response types properly classify server vs client errors
969+
970+
---
971+
818972
## Best Practices
819973

820974
### DO ✅
@@ -846,6 +1000,26 @@ extra_hosts:
8461000
GetMetricsPublisher().SendSLIMetric(...)
8471001
```
8481002

1003+
5. **Follow SQS Retry Strategy**
1004+
```go
1005+
// ✅ Only delete on success or permanent client errors
1006+
if err != nil {
1007+
GetMetricsPublisher().SendSLIMetric(ResponseTypeFailure, ...)
1008+
// Do NOT delete - let SQS retry for transient failures
1009+
log.Warnf("Processing failed, message will retry via SQS")
1010+
return err
1011+
}
1012+
```
1013+
1014+
6. **Use Infrastructure Service Tags**
1015+
```go
1016+
// For SQS polling/connection issues
1017+
map[string]string{
1018+
"service": "sqs_infrastructure",
1019+
"error": "sqs_receive_failed",
1020+
}
1021+
```
1022+
8491023
### DON'T ❌
8501024

8511025
1. **Don't Create Multiple Instances**
@@ -895,6 +1069,34 @@ extra_hosts:
8951069
GetMetricsPublisher().SendSLIMetric(...) // Single attempt
8961070
```
8971071

1072+
5. **Don't Delete Messages on Transient Failures**
1073+
```go
1074+
// ❌ Wrong
1075+
if err := fetchFromS3(); err != nil {
1076+
deleteMessage(sess, queueURL, messageHandle) // Data loss!
1077+
}
1078+
1079+
// ✅ Correct
1080+
if err := fetchFromS3(); err != nil {
1081+
log.Warnf("S3 fetch failed, message will retry via SQS")
1082+
return err // Let SQS handle retry
1083+
}
1084+
```
1085+
1086+
6. **Don't Mix Service Tags Inconsistently**
1087+
```go
1088+
// ❌ Wrong
1089+
map[string]string{
1090+
"service": "sqs", // Inconsistent naming
1091+
"service": "infrastructure", // Inconsistent naming
1092+
}
1093+
1094+
// ✅ Correct
1095+
map[string]string{
1096+
"service": "sqs_infrastructure", // Consistent naming
1097+
}
1098+
```
1099+
8981100
---
8991101

9001102
## Metrics Dashboard Queries
@@ -922,10 +1124,27 @@ sum(increase(error-budget.counters.prod-sli-uuid{response_type="failure"}[1h]))
9221124
### Top Failure Reasons
9231125

9241126
```sql
925-
-- Group by error type
1127+
-- Group by error type (includes new SQS infrastructure errors)
9261128
sum by (error) (
9271129
increase(error-budget.counters.prod-sli-uuid{response_type="failure"}[1h])
9281130
)
1131+
1132+
-- Expected error types:
1133+
-- s3_fetch_failed: S3 network/access issues
1134+
-- parse_or_write_failed: ClickHouse/parsing issues
1135+
-- sqs_delete_failed: SQS cleanup failures
1136+
-- sqs_queue_url_failed: SQS connection setup failures
1137+
-- sqs_receive_failed: SQS polling failures
1138+
```
1139+
1140+
### SQS Infrastructure Health
1141+
1142+
```sql
1143+
-- SQS infrastructure error rate
1144+
sum(rate(error-budget.counters.prod-sli-uuid{service="sqs_infrastructure",response_type="failure"}[5m]))
1145+
1146+
-- Client errors (malformed messages) - don't count against SLO
1147+
sum(rate(error-budget.counters.prod-sli-uuid{service="sqs_infrastructure",response_type="ignored_failure"}[5m]))
9291148
```
9301149

9311150
---
@@ -1000,6 +1219,15 @@ func processEvent(event Event) error {
10001219

10011220
## Changelog
10021221

1222+
### Version 1.1 (2025-11-05)
1223+
- **Major Improvement**: Enhanced SQS message handling strategy
1224+
- **Added**: SQS infrastructure monitoring (queue URL resolution, message polling)
1225+
- **Added**: Proper retry logic - no message deletion on transient failures
1226+
- **Added**: Malformed message handling with deletion for poison message protection
1227+
- **Added**: Infrastructure service tagging (`sqs_infrastructure`)
1228+
- **Improved**: Data durability through SQS visibility timeout retry mechanism
1229+
- **Added**: Comprehensive error type tracking across the entire pipeline
1230+
10031231
### Version 1.0 (2025-10-30)
10041232
- Initial implementation
10051233
- Singleton pattern with thread safety

0 commit comments

Comments
 (0)