Skip to content

Commit 94f2452

Browse files
feat: Add metrics publisher to indexer with S3 endpoint configuration
- Add SLI metrics tracking to gProfiler indexer for SLO monitoring - Add optional S3 endpoint configuration for local development - Include comprehensive testing and documentation Changes: - New: metrics_publisher.go - Singleton Graphite-based metrics publisher - Modified: args.go, main.go, worker.go - Metrics integration - Modified: config.py, s3_profile_dal.py - S3 endpoint support - New: LOCAL_TESTING_GUIDE.md - E2E testing guide - New: METRICS_PUBLISHER_INDEXER_DOCUMENTATION.md - API docs Note: Environment variable renamed from S3_ENDPOINT to AWS_ENDPOINT_URL (Standard AWS deployments unaffected - only impacts custom S3 endpoints)
1 parent 8aa8ca2 commit 94f2452

File tree

8 files changed

+1912
-6
lines changed

8 files changed

+1912
-6
lines changed

LOCAL_TESTING_GUIDE.md

Lines changed: 525 additions & 0 deletions
Large diffs are not rendered by default.

METRICS_PUBLISHER_INDEXER_DOCUMENTATION.md

Lines changed: 1035 additions & 0 deletions
Large diffs are not rendered by default.

src/gprofiler-dev/gprofiler_dev/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,6 @@
3838

3939
BUCKET_NAME = os.getenv("BUCKET_NAME", "gprofiler")
4040
BASE_DIRECTORY = "products"
41+
# Optional: Custom S3 endpoint for local testing (e.g., LocalStack) or S3-compatible services
42+
# In production, leave unset to use default AWS S3 endpoints
43+
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL")

src/gprofiler-dev/gprofiler_dev/s3_profile_dal.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ def __init__(
4747
aws_secret_access_key=config.AWS_SECRET_ACCESS_KEY,
4848
aws_session_token=config.AWS_SESSION_TOKEN,
4949
)
50-
self._s3_client = session.client("s3", config=Config(max_pool_connections=50))
51-
self._s3_resource = session.resource("s3")
50+
# endpoint_url allows connecting to LocalStack or S3-compatible services for testing
51+
# When None (default), uses standard AWS S3 endpoints
52+
self._s3_client = session.client("s3", config=Config(max_pool_connections=50), endpoint_url=config.S3_ENDPOINT_URL)
53+
self._s3_resource = session.resource("s3", endpoint_url=config.S3_ENDPOINT_URL)
5254

5355
@staticmethod
5456
def join_path(*parts: str) -> str:

src/gprofiler_indexer/args.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ type CLIArgs struct {
3636
FrameReplaceFileName string
3737
AWSEndpoint string
3838
AWSRegion string
39+
// Metrics Publisher Configuration
40+
MetricsEnabled bool
41+
MetricsAgentURL string
42+
MetricsServiceName string
43+
MetricsSLIUUID string
3944
}
4045

4146
func NewCliArgs() *CLIArgs {
@@ -50,14 +55,19 @@ func NewCliArgs() *CLIArgs {
5055
ClickHouseStacksBatchSize: 10000,
5156
ClickHouseMetricsBatchSize: 100,
5257
FrameReplaceFileName: ConfPrefix + "replace.yaml",
58+
// Metrics defaults
59+
MetricsEnabled: false,
60+
MetricsAgentURL: "tcp://localhost:18126",
61+
MetricsServiceName: "gprofiler-indexer",
62+
MetricsSLIUUID: "",
5363
}
5464
}
5565

5666
func (ca *CLIArgs) ParseArgs() {
5767
flag.StringVar(&ca.SQSQueue, "sqs-queue", LookupEnvOrString("SQS_QUEUE_URL", ca.SQSQueue),
5868
"SQS Queue name to listen")
5969
flag.StringVar(&ca.S3Bucket, "s3-bucket", LookupEnvOrString("S3_BUCKET", ca.S3Bucket), "S3 bucket name")
60-
flag.StringVar(&ca.AWSEndpoint, "aws-endpoint", LookupEnvOrString("S3_ENDPOINT", ca.AWSEndpoint), "AWS Endpoint URL")
70+
flag.StringVar(&ca.AWSEndpoint, "aws-endpoint", LookupEnvOrString("AWS_ENDPOINT_URL", ca.AWSEndpoint), "AWS Endpoint URL")
6171
flag.StringVar(&ca.AWSRegion, "aws-region", LookupEnvOrString("AWS_REGION", ca.AWSRegion), "AWS Region")
6272
flag.StringVar(&ca.ClickHouseAddr, "clickhouse-addr", LookupEnvOrString("CLICKHOUSE_ADDR", ca.ClickHouseAddr),
6373
"ClickHouse address like 127.0.0.1:9000")
@@ -83,6 +93,15 @@ func (ca *CLIArgs) ParseArgs() {
8393
flag.StringVar(&ca.FrameReplaceFileName, "replace-file", LookupEnvOrString("REPLACE_FILE",
8494
ca.FrameReplaceFileName),
8595
"replace.yaml")
96+
// Metrics Publisher Configuration
97+
flag.BoolVar(&ca.MetricsEnabled, "metrics-enabled", LookupEnvOrBool("METRICS_ENABLED", ca.MetricsEnabled),
98+
"Enable metrics publishing (default false)")
99+
flag.StringVar(&ca.MetricsAgentURL, "metrics-agent-url", LookupEnvOrString("METRICS_AGENT_URL", ca.MetricsAgentURL),
100+
"Metrics agent URL (default tcp://localhost:18126)")
101+
flag.StringVar(&ca.MetricsServiceName, "metrics-service-name", LookupEnvOrString("METRICS_SERVICE_NAME", ca.MetricsServiceName),
102+
"Service name for metrics (default gprofiler-indexer)")
103+
flag.StringVar(&ca.MetricsSLIUUID, "metrics-sli-uuid", LookupEnvOrString("METRICS_SLI_UUID", ca.MetricsSLIUUID),
104+
"SLI metric UUID")
86105
flag.Parse()
87106

88107
if ca.SQSQueue == "" && ca.InputFolder == "" {

src/gprofiler_indexer/main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ func main() {
4646
args.ParseArgs()
4747

4848
logger.Infof("Starting %s", AppName)
49+
50+
// Initialize metrics publisher
51+
metricsPublisher := NewMetricsPublisher(
52+
args.MetricsAgentURL,
53+
args.MetricsServiceName,
54+
args.MetricsSLIUUID,
55+
args.MetricsEnabled,
56+
)
57+
4958
tasks := make(chan SQSMessage, args.Concurrency)
5059
channels := RecordChannels{
5160
StacksRecords: make(chan StackRecord, args.ClickHouseStacksBatchSize),
@@ -107,5 +116,11 @@ func main() {
107116
}()
108117

109118
buffWriterWaitGroup.Wait()
119+
120+
// Cleanup metrics publisher
121+
if metricsPublisher != nil {
122+
metricsPublisher.FlushAndClose()
123+
}
124+
110125
logger.Info("Graceful shutdown")
111126
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
//
2+
// Copyright (C) 2023 Intel Corporation
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
package main
18+
19+
import (
20+
"fmt"
21+
"net"
22+
"strings"
23+
"sync"
24+
"time"
25+
26+
log "github.com/sirupsen/logrus"
27+
)
28+
29+
// Response type constants for SLI metrics
30+
const (
31+
ResponseTypeSuccess = "success"
32+
ResponseTypeFailure = "failure"
33+
ResponseTypeIgnoredFailure = "ignored_failure"
34+
)
35+
36+
// MetricsPublisher handles sending metrics to metrics agent via TCP
37+
type MetricsPublisher struct {
38+
host string
39+
port string
40+
serviceName string
41+
sliMetricUUID string
42+
enabled bool
43+
connectionFailed bool
44+
lastErrorLogTime int64
45+
errorLogInterval int64
46+
mutex sync.Mutex
47+
}
48+
49+
var (
50+
metricsInstance *MetricsPublisher
51+
metricsOnce sync.Once
52+
)
53+
54+
// NewMetricsPublisher creates or returns the singleton MetricsPublisher instance
55+
func NewMetricsPublisher(serverURL, serviceName, sliUUID string, enabled bool) *MetricsPublisher {
56+
metricsOnce.Do(func() {
57+
instance := &MetricsPublisher{
58+
serviceName: serviceName,
59+
sliMetricUUID: sliUUID,
60+
enabled: enabled,
61+
errorLogInterval: 300, // Log errors at most once every 5 minutes
62+
}
63+
64+
// Parse server URL (tcp://host:port)
65+
if strings.HasPrefix(serverURL, "tcp://") {
66+
urlParts := strings.Split(serverURL[6:], ":")
67+
instance.host = urlParts[0]
68+
if len(urlParts) > 1 {
69+
instance.port = urlParts[1]
70+
} else {
71+
instance.port = "18126"
72+
}
73+
} else {
74+
if enabled {
75+
log.Fatalf("Unsupported server URL format: %s. Expected tcp://host:port", serverURL)
76+
}
77+
instance.host = "localhost"
78+
instance.port = "18126"
79+
}
80+
81+
if enabled {
82+
log.Infof("MetricsPublisher initialized: service=%s, server=%s:%s, sli_enabled=%t",
83+
serviceName, instance.host, instance.port, sliUUID != "")
84+
} else {
85+
log.Info("MetricsPublisher disabled")
86+
}
87+
88+
metricsInstance = instance
89+
})
90+
91+
return metricsInstance
92+
}
93+
94+
// GetInstance returns the singleton MetricsPublisher instance
95+
// Returns nil if not initialized
96+
func GetMetricsPublisher() *MetricsPublisher {
97+
return metricsInstance
98+
}
99+
100+
// SendSLIMetric sends an SLI metric for tracking HTTP success rate
101+
// responseType: success, failure, or ignored_failure
102+
// methodName: The method/operation being tracked (e.g., "event_processing")
103+
// extraTags: Additional tags as key-value pairs
104+
func (m *MetricsPublisher) SendSLIMetric(responseType, methodName string, extraTags map[string]string) bool {
105+
if m == nil || !m.enabled || m.sliMetricUUID == "" {
106+
return false
107+
}
108+
109+
// Build metric name using configured SLI UUID
110+
metricName := fmt.Sprintf("error-budget.counters.%s", m.sliMetricUUID)
111+
112+
// Get current epoch timestamp
113+
timestamp := time.Now().Unix()
114+
115+
// Build tag string with required SLI tags (Graphite plaintext protocol format)
116+
tags := []string{
117+
fmt.Sprintf("service=%s", m.serviceName),
118+
fmt.Sprintf("response_type=%s", responseType),
119+
fmt.Sprintf("method_name=%s", methodName),
120+
}
121+
122+
if extraTags != nil {
123+
for key, value := range extraTags {
124+
tags = append(tags, fmt.Sprintf("%s=%s", key, value))
125+
}
126+
}
127+
128+
tagString := strings.Join(tags, " ")
129+
130+
// Format: put metric_name timestamp value tag1=value1 tag2=value2 ...
131+
metricLine := fmt.Sprintf("put %s %d 1 %s", metricName, timestamp, tagString)
132+
133+
log.Infof("📊 Sending SLI metric: %s", metricLine)
134+
135+
return m.sendMetric(metricLine)
136+
}
137+
138+
// SendErrorMetric sends an operational error metric
139+
func (m *MetricsPublisher) SendErrorMetric(metricName string, extraTags map[string]string) bool {
140+
if m == nil || !m.enabled {
141+
return false
142+
}
143+
144+
// Get current epoch timestamp
145+
timestamp := time.Now().Unix()
146+
147+
// Build tag string
148+
tags := []string{
149+
fmt.Sprintf("service=%s", m.serviceName),
150+
}
151+
152+
if extraTags != nil {
153+
for key, value := range extraTags {
154+
tags = append(tags, fmt.Sprintf("%s=%s", key, value))
155+
}
156+
}
157+
158+
tagString := strings.Join(tags, " ")
159+
160+
// Format: put metric_name timestamp value tag1=value1 tag2=value2 ...
161+
metricLine := fmt.Sprintf("put %s %d 1 %s", metricName, timestamp, tagString)
162+
163+
log.Debugf("📊 Sending error metric: %s", metricLine)
164+
165+
return m.sendMetric(metricLine)
166+
}
167+
168+
// sendMetric sends a metric line via TCP socket
169+
func (m *MetricsPublisher) sendMetric(metricLine string) bool {
170+
if m == nil || !m.enabled {
171+
return false
172+
}
173+
174+
// Check if we should throttle error logging
175+
m.mutex.Lock()
176+
now := time.Now().Unix()
177+
shouldLogError := now-m.lastErrorLogTime >= m.errorLogInterval
178+
m.mutex.Unlock()
179+
180+
// Ensure metric line ends with newline
181+
if !strings.HasSuffix(metricLine, "\n") {
182+
metricLine = metricLine + "\n"
183+
}
184+
185+
// Create TCP connection with timeout
186+
address := net.JoinHostPort(m.host, m.port)
187+
conn, err := net.DialTimeout("tcp", address, 1*time.Second)
188+
if err != nil {
189+
if shouldLogError {
190+
log.Warnf("Failed to connect to metrics agent at %s: %v", address, err)
191+
m.mutex.Lock()
192+
m.lastErrorLogTime = now
193+
m.connectionFailed = true
194+
m.mutex.Unlock()
195+
}
196+
return false
197+
}
198+
defer conn.Close()
199+
200+
// Set write timeout
201+
conn.SetWriteDeadline(time.Now().Add(1 * time.Second))
202+
203+
// Send metric
204+
_, err = conn.Write([]byte(metricLine))
205+
if err != nil {
206+
if shouldLogError {
207+
log.Warnf("Failed to send metric: %v", err)
208+
m.mutex.Lock()
209+
m.lastErrorLogTime = now
210+
m.mutex.Unlock()
211+
}
212+
return false
213+
}
214+
215+
// Reset connection failed flag on success
216+
m.mutex.Lock()
217+
if m.connectionFailed {
218+
log.Info("Successfully reconnected to metrics agent")
219+
m.connectionFailed = false
220+
}
221+
m.mutex.Unlock()
222+
223+
return true
224+
}
225+
226+
// FlushAndClose flushes any pending metrics and closes the publisher
227+
func (m *MetricsPublisher) FlushAndClose() {
228+
m.mutex.Lock()
229+
defer m.mutex.Unlock()
230+
231+
log.Info("MetricsPublisher closed")
232+
m.enabled = false
233+
}
234+

0 commit comments

Comments
 (0)