Skip to content

feat(metrics): add metrics collection mechanism #1071

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 60 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
d2771f8
feat(metrics): add metrics collection mechanism
leo-marble Jul 1, 2025
a948c87
refactor(task_queue): integrate global queue configuration
leo-marble Jul 2, 2025
66f5c83
refactor(metrics): rename NewCollectorV1 to NewCollectorsV1 and clean…
leo-marble Jul 2, 2025
cbe80a8
feat(metrics): enhance metrics collection with frequency support
leo-marble Jul 2, 2025
8ba5830
refactor(metrics): rename MetricsPayload to MetricsCollection and enh…
leo-marble Jul 2, 2025
ebec6b2
fix(time_ranges): Edge case on monthly frequency when last day of 31 …
leo-marble Jul 3, 2025
ee95279
fix(metrics_collection): use UTC when using time.Date
leo-marble Jul 3, 2025
af150ea
fix(metrics_collection): use UTC when using time.Date
leo-marble Jul 3, 2025
b4b73f9
feat(metrics_collection): add api version and license collector
leo-marble Jul 3, 2025
45258cd
refactor(metrics_collection): replace slices.ContainsFunc with assert…
leo-marble Jul 3, 2025
497dcdf
feat(task_queue): implement queue whitelist for non-org metrics and u…
leo-marble Jul 3, 2025
74fa39d
fix(time_ranges_test): Expect an error in case from > to
leo-marble Jul 3, 2025
9c4f7b5
refactor(watermark): rename OffloadingWatermark to Watermark and upda…
leo-marble Jul 3, 2025
69a4af0
feat(metrics_collection): add watermark to fetch data from watermark …
leo-marble Jul 3, 2025
9880327
refacto(watermark): delete offloading watermark and change *json.RawM…
leo-marble Jul 4, 2025
806262c
feat(metrics_collection): add configuration for metrics collection
leo-marble Jul 4, 2025
a6e59f0
feat(metrics_collection): implement metrics ingestion API and update …
leo-marble Jul 4, 2025
266f61e
fix(metrics_ingestion): defer after sending metrics
leo-marble Jul 4, 2025
e675f15
fix(metrics_ingestion): handle body parsing error
leo-marble Jul 7, 2025
f33b4d5
refacto(worker): use for range map instead of slice
leo-marble Jul 7, 2025
42c83de
refacto(metrics_collection_job): use NewEncoder instead of jsonMarsha…
leo-marble Jul 7, 2025
7de85d4
refacto(metrics_ingestion): remove ingection checking, let storage to…
leo-marble Jul 7, 2025
af2cfae
refacto(metrics): refactor MetricData structure to use separate Numer…
leo-marble Jul 7, 2025
3eac549
feat(metrics_collection): enhance metrics ingestion with deployment I…
leo-marble Jul 7, 2025
70e526e
refacto(watermark_repository): simplify SaveWatermark logic by using …
leo-marble Jul 7, 2025
26e9e9c
feat(bigquery): integrate BigQuery client for metrics ingestion and h…
leo-marble Jul 7, 2025
9a45a96
feat(metrics_ingestion): implement SendMetrics function to send metri…
leo-marble Jul 8, 2025
55bb29b
refactor(metrics_collection): simplify NullString and NullFloat64 han…
leo-marble Jul 8, 2025
70e9c3d
fix(metrics_collection): update Collect method to include time range …
leo-marble Jul 8, 2025
bc53b9f
feat(metrics_collection): add DecisionCollector to count decisions pe…
leo-marble Jul 8, 2025
4cfd7d0
feat(metrics_ingestion): add license validation if license key is giv…
leo-marble Jul 9, 2025
c773278
fix(metrics_ingestion): Do not retry if the request send 401 error
leo-marble Jul 9, 2025
a78267e
fix(metrics_collection): update getFromTime to collect metrics from t…
leo-marble Jul 9, 2025
4945c82
feat(metrics_collection): add CaseCollector to count cases per organi…
leo-marble Jul 9, 2025
85f2938
refactor(metrics_collection): remove MetricCollectionFrequency from M…
leo-marble Jul 9, 2025
e3a8377
chore(routes): reorder validate-license route for improved clarity
leo-marble Jul 9, 2025
7e30563
refactor(metrics_ingestion): simplify success response by removing JS…
leo-marble Jul 9, 2025
607e048
feat(gcp): add GCP project ID retrieval and integrate with metrics co…
leo-marble Jul 9, 2025
1ed3639
refactor(repositories): streamline case and decision counting logic t…
leo-marble Jul 9, 2025
8e4d1ef
refactor(metrics_collection): update utility functions for BigQuery c…
leo-marble Jul 10, 2025
1ee2922
feat(bigquery): enhance BigQuery configuration with metrics dataset a…
leo-marble Jul 10, 2025
317be38
rename(migration): Change the migration name to change the migration …
leo-marble Jul 10, 2025
b809a04
refactor(metrics_collection): from and to date are mandatory and stor…
leo-marble Jul 10, 2025
e2692e4
Update usecases/metrics_ingestion.go
leo-marble Jul 10, 2025
5184729
refacto(metrics): metrics URL configuration
leo-marble Jul 15, 2025
212fff5
fix(server): update BigQuery ProjectID retrieval to use GCP configura…
leo-marble Jul 15, 2025
d254812
refactor(metrics): remove BigQuery references and simplify app versio…
leo-marble Jul 15, 2025
d360e15
refactor(metrics): improve license validation handling and update HTT…
leo-marble Jul 15, 2025
7a461f8
refactor(metrics_collection): centralize metric names
leo-marble Jul 15, 2025
0a2afdb
refactor(bigquery): replace BigQueryClient with BigQueryInfra for imp…
leo-marble Jul 15, 2025
67352d8
refactor(worker): streamline global queue management by introducing e…
leo-marble Jul 15, 2025
177ad09
refactor(metrics_collection): rename NewCollectorsTestV1 to NewCollec…
leo-marble Jul 15, 2025
128a6c2
feat(metrics_collection): add fallback duration configuration for met…
leo-marble Jul 16, 2025
b0bfd33
feat(metadata): introduce metadata management with database integrati…
leo-marble Jul 16, 2025
708bb06
refactor(metrics): rename OrganizationID to OrgID in MetricDataDto an…
leo-marble Jul 16, 2025
dac1ebb
refactor(metrics): update error handling in metrics ingestion and str…
leo-marble Jul 17, 2025
db3a193
feat(metrics_collection): add Timeout method to MetricCollectionWorke…
leo-marble Jul 17, 2025
b8af99f
refactor(metrics): update telemetry configuration to use DISABLED_TEL…
leo-marble Jul 17, 2025
9658147
refactor(bigquery): update MetricsDataset constant to use 'marble_met…
leo-marble Jul 17, 2025
f245195
refactor(license): simplify license verification logic by removing un…
leo-marble Jul 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,12 @@ SENTRY_DSN=
# OFFLOADING_SAVE_POINTS=1000
# OFFLOADING_WRITES_PER_SEC=200

# Configure metrics collection.
# By default, the metrics collection is enabled
# DISABLED_TELEMETRY=false
# METRICS_COLLECTION_JOB_INTERVAL=1h

# Abort and exit if license cannot be validated.
KILL_IF_READ_LICENSE_ERROR=false

# BIGQUERY_PROJECT_ID=
34 changes: 34 additions & 0 deletions api/handle_metrics_ingestion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package api

import (
"net/http"

"github.com/checkmarble/marble-backend/dto"
"github.com/checkmarble/marble-backend/usecases"
"github.com/checkmarble/marble-backend/utils"
"github.com/gin-gonic/gin"
)

// Handle metrics ingestion from the metrics collection worker.
func handleMetricsIngestion(uc usecases.Usecases) func(c *gin.Context) {
return func(c *gin.Context) {
logger := utils.LoggerFromContext(c.Request.Context())
var metricsCollectionDto dto.MetricsCollectionDto
if err := c.ShouldBindJSON(&metricsCollectionDto); err != nil {
c.Status(http.StatusBadRequest)
logger.WarnContext(c.Request.Context(), "Failed to bind metrics collection", "error", err.Error())
return
}

metricsCollection := dto.AdaptMetricsCollection(metricsCollectionDto)

usecase := uc.NewMetricsIngestionUsecase()
err := usecase.IngestMetrics(c.Request.Context(), metricsCollection)
if presentError(c.Request.Context(), c, err) {
return
}

// Success response
c.Status(http.StatusOK)
}
}
7 changes: 6 additions & 1 deletion api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"time"

"github.com/checkmarble/marble-backend/infra"
"github.com/checkmarble/marble-backend/pubapi"
pubapiv1 "github.com/checkmarble/marble-backend/pubapi/v1"
"github.com/checkmarble/marble-backend/utils"
Expand Down Expand Up @@ -38,11 +39,15 @@ func addRoutes(r *gin.Engine, conf Configuration, uc usecases.Usecases, auth uti
r.GET("/health", tom, handleHealth(uc))
r.GET("/version", tom, handleVersion(uc))
r.POST("/token", tom, tokenHandler.GenerateToken)
r.GET("/validate-license/*license_key", tom, handleValidateLicense(uc))
r.GET("/config", tom, handleGetConfig(uc, conf))
r.GET("/is-sso-available", tom, handleIsSSOEnabled(uc))
r.GET("/signup-status", tom, handleSignupStatus(uc))

if infra.IsMarbleSaasProject() {
r.GET("/validate-license/*license_key", tom, handleValidateLicense(uc))
r.POST("/metrics", tom, handleMetricsIngestion(uc))
}

// Public API initialization
{
cfg := pubapi.Config{
Expand Down
20 changes: 19 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ func RunServer(config CompiledConfig) error {
LicenseKey: utils.GetEnv("LICENSE_KEY", ""),
KillIfReadLicenseError: utils.GetEnv("KILL_IF_READ_LICENSE_ERROR", false),
}
bigQueryConfig := infra.BigQueryConfig{
ProjectID: utils.GetEnv("BIGQUERY_PROJECT_ID", gcpConfig.ProjectId),
MetricsDataset: utils.GetEnv("BIGQUERY_METRICS_DATASET", infra.MetricsDataset),
MetricsTable: utils.GetEnv("BIGQUERY_METRICS_TABLE", infra.MetricsTable),
}

serverConfig := struct {
batchIngestionMaxSize int
caseManagerBucket string
Expand Down Expand Up @@ -193,6 +199,16 @@ func RunServer(config CompiledConfig) error {
return err
}

var bigQueryInfra *infra.BigQueryInfra
if infra.IsMarbleSaasProject() {
bigQueryInfra, err = infra.InitializeBigQueryInfra(ctx, bigQueryConfig)
if err != nil {
utils.LogAndReportSentryError(ctx, err)
return err
}
defer bigQueryInfra.Close()
}

repositories := repositories.NewRepositories(
pool,
gcpConfig,
Expand All @@ -206,6 +222,7 @@ func RunServer(config CompiledConfig) error {
repositories.WithClientDbConfig(clientDbConfig),
repositories.WithTracerProvider(telemetryRessources.TracerProvider),
repositories.WithRiverClient(riverClient),
repositories.WithBigQueryInfra(bigQueryInfra),
)

uc := usecases.NewUsecases(repositories,
Expand Down Expand Up @@ -245,7 +262,8 @@ func RunServer(config CompiledConfig) error {

deps := api.InitDependencies(ctx, apiConfig, pool, marbleJwtSigningKey)

router := api.InitRouterMiddlewares(ctx, apiConfig, apiConfig.DisableSegment, deps.SegmentClient, telemetryRessources)
router := api.InitRouterMiddlewares(ctx, apiConfig, apiConfig.DisableSegment,
deps.SegmentClient, telemetryRessources)
server := api.NewServer(router, apiConfig, uc, deps.Authentication, deps.TokenHandler, logger)

notify, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
Expand Down
74 changes: 70 additions & 4 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package cmd
import (
"context"
"log/slog"
"maps"
"net/http"
"os"
"os/signal"
"slices"
"syscall"
"time"

Expand All @@ -14,6 +16,7 @@ import (
"github.com/checkmarble/marble-backend/models"
"github.com/checkmarble/marble-backend/repositories"
"github.com/checkmarble/marble-backend/usecases"
"github.com/checkmarble/marble-backend/usecases/scheduled_execution"
"github.com/checkmarble/marble-backend/utils"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -97,6 +100,14 @@ func RunTaskQueue(apiVersion string) error {

offloadingConfig.ValidateAndFix(ctx)

metricCollectionConfig := infra.MetricCollectionConfig{
Disabled: utils.GetEnv("DISABLED_TELEMETRY", false),
JobInterval: utils.GetEnvDuration("METRICS_COLLECTION_JOB_INTERVAL", 1*time.Hour),
MetricsIngestionURL: utils.GetEnv("METRICS_INGESTION_URL", ""),
FallbackDuration: utils.GetEnvDuration("METRICS_FALLBACK_DURATION", 30*24*time.Hour),
}
metricCollectionConfig.Configure()

infra.SetupSentry(workerConfig.sentryDsn, workerConfig.env, apiVersion)
defer sentry.Flush(3 * time.Second)

Expand Down Expand Up @@ -154,12 +165,23 @@ func RunTaskQueue(apiVersion string) error {
return err
}

// For non-org
nonOrgQueues := make(map[string]river.QueueConfig)
globalPeriodics := []*river.PeriodicJob{}

if !metricCollectionConfig.Disabled {
metricQueue := usecases.QueueMetrics()
maps.Copy(nonOrgQueues, metricQueue)
globalPeriodics = append(globalPeriodics,
scheduled_execution.NewMetricsCollectionPeriodicJob(metricCollectionConfig))
}

maps.Copy(queues, nonOrgQueues)

// Periodics always contain the per-org tasks retrieved above. Add other, non-organization-scoped periodics below
periodics := append(
orgPeriodics,
[]*river.PeriodicJob{
// Add periodic jobs here
}...,
globalPeriodics...,
)

riverClient, err = river.NewClient(riverpgxv5.New(pool), &river.Config{
Expand All @@ -183,13 +205,21 @@ func RunTaskQueue(apiVersion string) error {
return err
}

// Ensure that all global queues are active.
if err := ensureGlobalQueuesAreActive(ctx, riverClient, nonOrgQueues); err != nil {
utils.LogAndReportSentryError(ctx, err)
return err
}

uc := usecases.NewUsecases(repositories,
usecases.WithIngestionBucketUrl(workerConfig.ingestionBucketUrl),
usecases.WithOffloading(offloadingConfig),
usecases.WithFailedWebhooksRetryPageSize(workerConfig.failedWebhooksRetryPageSize),
usecases.WithLicense(license),
usecases.WithConvoyServer(convoyConfiguration.APIUrl),
usecases.WithOpensanctions(openSanctionsConfig.IsSet()),
usecases.WithApiVersion(apiVersion),
usecases.WithMetricsCollectionConfig(metricCollectionConfig),
)
adminUc := jobs.GenerateUsecaseWithCredForMarbleAdmin(ctx, uc)
river.AddWorker(workers, adminUc.NewAsyncDecisionWorker())
Expand All @@ -204,6 +234,9 @@ func RunTaskQueue(apiVersion string) error {
if offloadingConfig.Enabled {
river.AddWorker(workers, adminUc.NewOffloadingWorker())
}
if !metricCollectionConfig.Disabled {
river.AddWorker(workers, uc.NewMetricsCollectionWorker(licenseConfig))
}

if err := riverClient.Start(ctx); err != nil {
utils.LogAndReportSentryError(ctx, err)
Expand All @@ -226,7 +259,9 @@ func RunTaskQueue(apiVersion string) error {
logger.InfoContext(ctx, "starting worker", slog.String("version", apiVersion))

// Asynchronously keep the task queue workers up to date with the orgs in the database
taskQueueWorker := uc.NewTaskQueueWorker(riverClient)
taskQueueWorker := uc.NewTaskQueueWorker(riverClient,
slices.Collect(maps.Keys(nonOrgQueues)),
)
go taskQueueWorker.RefreshQueuesFromOrgIds(ctx)

// Start the cron jobs using the old entrypoint.
Expand Down Expand Up @@ -295,3 +330,34 @@ func cleanStop(ctx context.Context, sigintOrTerm chan os.Signal, riverClient *ri
}
// hard stop succeeded
}

// Ensure that all global queues are active in river DB, i.e. not paused. This function is more a safety net.
// River stores the state of the queues in the DB. If a queue exists but is paused, River client will not resume it
// We have some function which can pause queues, so we need to ensure that all global queues are active
// cf: riverClient.QueueResume and riverClient.QueuePause
func ensureGlobalQueuesAreActive(ctx context.Context, riverClient *river.Client[pgx.Tx],
nonOrgQueues map[string]river.QueueConfig,
) error {
logger := utils.LoggerFromContext(ctx)

for queueName := range nonOrgQueues {
queueState, err := riverClient.QueueGet(ctx, queueName)
if err != nil {
if errors.Is(err, river.ErrNotFound) {
// Queue will be created when River starts, skip
continue
}
return err
}

// If the queue exists and is paused, resume it
if queueState.PausedAt != nil {
logger.InfoContext(ctx, "Resuming global queue at startup", "queue", queueName)
if err := riverClient.QueueResume(ctx, queueName, &river.QueuePauseOpts{}); err != nil {
return err
}
}
}

return nil
}
76 changes: 76 additions & 0 deletions dto/metrics_dto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package dto

import (
"time"

"github.com/checkmarble/marble-backend/models"
"github.com/checkmarble/marble-backend/pure_utils"
"github.com/google/uuid"
)

// Be careful when changing this struct, it is used as input and output in the API.
type MetricDataDto struct {
Name string `json:"name" binding:"required"`
Numeric *float64 `json:"numeric,omitempty"`
Text *string `json:"text,omitempty"`
Timestamp time.Time `json:"timestamp" binding:"required"`
OrgID *string `json:"org_id,omitempty"`
From time.Time `json:"from" binding:"required"`
To time.Time `json:"to" binding:"required"`
}

// Be careful when changing this struct, it is used as input and output in the API.
type MetricsCollectionDto struct {
CollectionID uuid.UUID `json:"collection_id" binding:"required"`
Timestamp time.Time `json:"timestamp" binding:"required"`
Metrics []MetricDataDto `json:"metrics" binding:"required"`
Version string `json:"version" binding:"required"`
DeploymentID uuid.UUID `json:"deployment_id" binding:"required"`
LicenseKey *string `json:"license_key,omitempty"`
}

func AdaptMetricDataDto(metricData models.MetricData) MetricDataDto {
return MetricDataDto{
Name: metricData.Name,
Numeric: metricData.Numeric,
Text: metricData.Text,
Timestamp: metricData.Timestamp,
OrgID: metricData.OrgID,
From: metricData.From,
To: metricData.To,
}
}

func AdaptMetricsCollectionDto(metricsCollection models.MetricsCollection) MetricsCollectionDto {
return MetricsCollectionDto{
CollectionID: metricsCollection.CollectionID,
Timestamp: metricsCollection.Timestamp,
Metrics: pure_utils.Map(metricsCollection.Metrics, AdaptMetricDataDto),
Version: metricsCollection.Version,
DeploymentID: metricsCollection.DeploymentID,
LicenseKey: metricsCollection.LicenseKey,
}
}

func AdaptMetricData(metricDataDto MetricDataDto) models.MetricData {
return models.MetricData{
Name: metricDataDto.Name,
Numeric: metricDataDto.Numeric,
Text: metricDataDto.Text,
Timestamp: metricDataDto.Timestamp,
OrgID: metricDataDto.OrgID,
From: metricDataDto.From,
To: metricDataDto.To,
}
}

func AdaptMetricsCollection(metricsCollectionDto MetricsCollectionDto) models.MetricsCollection {
return models.MetricsCollection{
CollectionID: metricsCollectionDto.CollectionID,
Timestamp: metricsCollectionDto.Timestamp,
Metrics: pure_utils.Map(metricsCollectionDto.Metrics, AdaptMetricData),
Version: metricsCollectionDto.Version,
DeploymentID: metricsCollectionDto.DeploymentID,
LicenseKey: metricsCollectionDto.LicenseKey,
}
}
Loading