Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
525 changes: 525 additions & 0 deletions LOCAL_TESTING_GUIDE.md

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this file. Not really needed.

Large diffs are not rendered by default.

1,035 changes: 1,035 additions & 0 deletions METRICS_PUBLISHER_INDEXER_DOCUMENTATION.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/gprofiler-dev/gprofiler_dev/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@

BUCKET_NAME = os.getenv("BUCKET_NAME", "gprofiler")
BASE_DIRECTORY = "products"
# Optional: Custom S3 endpoint for local testing (e.g., LocalStack) or S3-compatible services
# In production, leave unset to use default AWS S3 endpoints
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL")
6 changes: 4 additions & 2 deletions src/gprofiler-dev/gprofiler_dev/s3_profile_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ def __init__(
aws_secret_access_key=config.AWS_SECRET_ACCESS_KEY,
aws_session_token=config.AWS_SESSION_TOKEN,
)
self._s3_client = session.client("s3", config=Config(max_pool_connections=50))
self._s3_resource = session.resource("s3")
# endpoint_url allows connecting to LocalStack or S3-compatible services for testing
# When None (default), uses standard AWS S3 endpoints
self._s3_client = session.client("s3", config=Config(max_pool_connections=50), endpoint_url=config.S3_ENDPOINT_URL)
self._s3_resource = session.resource("s3", endpoint_url=config.S3_ENDPOINT_URL)

@staticmethod
def join_path(*parts: str) -> str:
Expand Down
21 changes: 20 additions & 1 deletion src/gprofiler_indexer/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type CLIArgs struct {
FrameReplaceFileName string
AWSEndpoint string
AWSRegion string
// Metrics Publisher Configuration
MetricsEnabled bool
MetricsAgentURL string
MetricsServiceName string
MetricsSLIUUID string
}

func NewCliArgs() *CLIArgs {
Expand All @@ -50,14 +55,19 @@ func NewCliArgs() *CLIArgs {
ClickHouseStacksBatchSize: 10000,
ClickHouseMetricsBatchSize: 100,
FrameReplaceFileName: ConfPrefix + "replace.yaml",
// Metrics defaults
MetricsEnabled: false,
MetricsAgentURL: "tcp://localhost:18126",
MetricsServiceName: "gprofiler-indexer",
MetricsSLIUUID: "",
}
}

func (ca *CLIArgs) ParseArgs() {
flag.StringVar(&ca.SQSQueue, "sqs-queue", LookupEnvOrString("SQS_QUEUE_URL", ca.SQSQueue),
"SQS Queue name to listen")
flag.StringVar(&ca.S3Bucket, "s3-bucket", LookupEnvOrString("S3_BUCKET", ca.S3Bucket), "S3 bucket name")
flag.StringVar(&ca.AWSEndpoint, "aws-endpoint", LookupEnvOrString("S3_ENDPOINT", ca.AWSEndpoint), "AWS Endpoint URL")
flag.StringVar(&ca.AWSEndpoint, "aws-endpoint", LookupEnvOrString("AWS_ENDPOINT_URL", ca.AWSEndpoint), "AWS Endpoint URL")
flag.StringVar(&ca.AWSRegion, "aws-region", LookupEnvOrString("AWS_REGION", ca.AWSRegion), "AWS Region")
flag.StringVar(&ca.ClickHouseAddr, "clickhouse-addr", LookupEnvOrString("CLICKHOUSE_ADDR", ca.ClickHouseAddr),
"ClickHouse address like 127.0.0.1:9000")
Expand All @@ -83,6 +93,15 @@ func (ca *CLIArgs) ParseArgs() {
flag.StringVar(&ca.FrameReplaceFileName, "replace-file", LookupEnvOrString("REPLACE_FILE",
ca.FrameReplaceFileName),
"replace.yaml")
// Metrics Publisher Configuration
flag.BoolVar(&ca.MetricsEnabled, "metrics-enabled", LookupEnvOrBool("METRICS_ENABLED", ca.MetricsEnabled),
"Enable metrics publishing (default false)")
flag.StringVar(&ca.MetricsAgentURL, "metrics-agent-url", LookupEnvOrString("METRICS_AGENT_URL", ca.MetricsAgentURL),
"Metrics agent URL (default tcp://localhost:18126)")
flag.StringVar(&ca.MetricsServiceName, "metrics-service-name", LookupEnvOrString("METRICS_SERVICE_NAME", ca.MetricsServiceName),
"Service name for metrics (default gprofiler-indexer)")
flag.StringVar(&ca.MetricsSLIUUID, "metrics-sli-uuid", LookupEnvOrString("METRICS_SLI_UUID", ca.MetricsSLIUUID),
"SLI metric UUID")
flag.Parse()

if ca.SQSQueue == "" && ca.InputFolder == "" {
Expand Down
15 changes: 15 additions & 0 deletions src/gprofiler_indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ func main() {
args.ParseArgs()

logger.Infof("Starting %s", AppName)

// Initialize metrics publisher
metricsPublisher := NewMetricsPublisher(
args.MetricsAgentURL,
args.MetricsServiceName,
args.MetricsSLIUUID,
args.MetricsEnabled,
)

tasks := make(chan SQSMessage, args.Concurrency)
channels := RecordChannels{
StacksRecords: make(chan StackRecord, args.ClickHouseStacksBatchSize),
Expand Down Expand Up @@ -107,5 +116,11 @@ func main() {
}()

buffWriterWaitGroup.Wait()

// Cleanup metrics publisher
if metricsPublisher != nil {
metricsPublisher.FlushAndClose()
}

logger.Info("Graceful shutdown")
}
234 changes: 234 additions & 0 deletions src/gprofiler_indexer/metrics_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
//
// Copyright (C) 2023 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package main

import (
"fmt"
"net"
"strings"
"sync"
"time"

log "github.com/sirupsen/logrus"
)

// Response type constants for SLI metrics
const (
ResponseTypeSuccess = "success"
ResponseTypeFailure = "failure"
ResponseTypeIgnoredFailure = "ignored_failure"
)

// MetricsPublisher handles sending metrics to metrics agent via TCP
type MetricsPublisher struct {
host string
port string
serviceName string
sliMetricUUID string
enabled bool
connectionFailed bool
lastErrorLogTime int64
errorLogInterval int64
mutex sync.Mutex
}

var (
metricsInstance *MetricsPublisher
metricsOnce sync.Once
)

// NewMetricsPublisher creates or returns the singleton MetricsPublisher instance
func NewMetricsPublisher(serverURL, serviceName, sliUUID string, enabled bool) *MetricsPublisher {
metricsOnce.Do(func() {
instance := &MetricsPublisher{
serviceName: serviceName,
sliMetricUUID: sliUUID,
enabled: enabled,
errorLogInterval: 300, // Log errors at most once every 5 minutes
}

// Parse server URL (tcp://host:port)
if strings.HasPrefix(serverURL, "tcp://") {
urlParts := strings.Split(serverURL[6:], ":")
instance.host = urlParts[0]
if len(urlParts) > 1 {
instance.port = urlParts[1]
} else {
instance.port = "18126"
}
} else {
if enabled {
log.Fatalf("Unsupported server URL format: %s. Expected tcp://host:port", serverURL)
}
instance.host = "localhost"
instance.port = "18126"
}

if enabled {
log.Infof("MetricsPublisher initialized: service=%s, server=%s:%s, sli_enabled=%t",
serviceName, instance.host, instance.port, sliUUID != "")
} else {
log.Info("MetricsPublisher disabled")
}

metricsInstance = instance
})

return metricsInstance
}

// GetInstance returns the singleton MetricsPublisher instance
// Returns nil if not initialized
func GetMetricsPublisher() *MetricsPublisher {
return metricsInstance
}

// SendSLIMetric sends an SLI metric for tracking HTTP success rate
// responseType: success, failure, or ignored_failure
// methodName: The method/operation being tracked (e.g., "event_processing")
// extraTags: Additional tags as key-value pairs
func (m *MetricsPublisher) SendSLIMetric(responseType, methodName string, extraTags map[string]string) bool {
if m == nil || !m.enabled || m.sliMetricUUID == "" {
return false
}

// Build metric name using configured SLI UUID
metricName := fmt.Sprintf("error-budget.counters.%s", m.sliMetricUUID)

// Get current epoch timestamp
timestamp := time.Now().Unix()

// Build tag string with required SLI tags (Graphite plaintext protocol format)
tags := []string{
fmt.Sprintf("service=%s", m.serviceName),
fmt.Sprintf("response_type=%s", responseType),
fmt.Sprintf("method_name=%s", methodName),
}

if extraTags != nil {
for key, value := range extraTags {
tags = append(tags, fmt.Sprintf("%s=%s", key, value))
}
}

tagString := strings.Join(tags, " ")

// Format: put metric_name timestamp value tag1=value1 tag2=value2 ...
metricLine := fmt.Sprintf("put %s %d 1 %s", metricName, timestamp, tagString)

log.Infof("πŸ“Š Sending SLI metric: %s", metricLine)

return m.sendMetric(metricLine)
}

// SendErrorMetric sends an operational error metric
func (m *MetricsPublisher) SendErrorMetric(metricName string, extraTags map[string]string) bool {
if m == nil || !m.enabled {
return false
}

// Get current epoch timestamp
timestamp := time.Now().Unix()

// Build tag string
tags := []string{
fmt.Sprintf("service=%s", m.serviceName),
}

if extraTags != nil {
for key, value := range extraTags {
tags = append(tags, fmt.Sprintf("%s=%s", key, value))
}
}

tagString := strings.Join(tags, " ")

// Format: put metric_name timestamp value tag1=value1 tag2=value2 ...
metricLine := fmt.Sprintf("put %s %d 1 %s", metricName, timestamp, tagString)

log.Debugf("πŸ“Š Sending error metric: %s", metricLine)

return m.sendMetric(metricLine)
}

// sendMetric sends a metric line via TCP socket
func (m *MetricsPublisher) sendMetric(metricLine string) bool {
if m == nil || !m.enabled {
return false
}

// Check if we should throttle error logging
m.mutex.Lock()
now := time.Now().Unix()
shouldLogError := now-m.lastErrorLogTime >= m.errorLogInterval
m.mutex.Unlock()

// Ensure metric line ends with newline
if !strings.HasSuffix(metricLine, "\n") {
metricLine = metricLine + "\n"
}

// Create TCP connection with timeout
address := net.JoinHostPort(m.host, m.port)
conn, err := net.DialTimeout("tcp", address, 1*time.Second)
if err != nil {
if shouldLogError {
log.Warnf("Failed to connect to metrics agent at %s: %v", address, err)
m.mutex.Lock()
m.lastErrorLogTime = now
m.connectionFailed = true
m.mutex.Unlock()
}
return false
}
defer conn.Close()

// Set write timeout
conn.SetWriteDeadline(time.Now().Add(1 * time.Second))

// Send metric
_, err = conn.Write([]byte(metricLine))
if err != nil {
if shouldLogError {
log.Warnf("Failed to send metric: %v", err)
m.mutex.Lock()
m.lastErrorLogTime = now
m.mutex.Unlock()
}
return false
}

// Reset connection failed flag on success
m.mutex.Lock()
if m.connectionFailed {
log.Info("Successfully reconnected to metrics agent")
m.connectionFailed = false
}
m.mutex.Unlock()

return true
}

// FlushAndClose flushes any pending metrics and closes the publisher
func (m *MetricsPublisher) FlushAndClose() {
m.mutex.Lock()
defer m.mutex.Unlock()

log.Info("MetricsPublisher closed")
m.enabled = false
}

Loading
Loading