Skip to content

Commit 7fe2dbd

Browse files
committed
feat(metrics_collection): add configuration for metrics collection
1 parent 64a4925 commit 7fe2dbd

File tree

4 files changed

+35
-14
lines changed

4 files changed

+35
-14
lines changed

.env.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,9 @@ SENTRY_DSN=
142142
# OFFLOADING_SAVE_POINTS=1000
143143
# OFFLOADING_WRITES_PER_SEC=200
144144

145+
# Configure metrics collection.
146+
# METRICS_COLLECTION_ENABLED=true
147+
# METRICS_COLLECTION_JOB_INTERVAL=1h
148+
145149
# Abort and exit if license cannot be validated.
146150
KILL_IF_READ_LICENSE_ERROR=false

cmd/worker.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ func RunTaskQueue(apiVersion string) error {
100100

101101
offloadingConfig.ValidateAndFix(ctx)
102102

103+
metricCollectionConfig := infra.MetricCollectionConfig{
104+
Enabled: utils.GetEnv("METRICS_COLLECTION_ENABLED", true),
105+
JobInterval: utils.GetEnvDuration("METRICS_COLLECTION_JOB_INTERVAL", 1*time.Hour),
106+
}
107+
103108
infra.SetupSentry(workerConfig.sentryDsn, workerConfig.env, apiVersion)
104109
defer sentry.Flush(3 * time.Second)
105110

@@ -157,21 +162,25 @@ func RunTaskQueue(apiVersion string) error {
157162
return err
158163
}
159164

160-
// For non-org queues
165+
// For non-org
161166
nonOrgQueues := make(map[string]river.QueueConfig)
162-
maps.Copy(nonOrgQueues, usecases.QueueMetrics())
163-
queueWhitelist := slices.Collect(maps.Keys(nonOrgQueues))
167+
queueWhitelist := []string{}
168+
globalPeriodics := []*river.PeriodicJob{}
169+
170+
if metricCollectionConfig.Enabled {
171+
maps.Copy(nonOrgQueues, usecases.QueueMetrics())
172+
queueWhitelist = append(queueWhitelist, slices.Collect(maps.Keys(nonOrgQueues))...)
173+
globalPeriodics = append(globalPeriodics,
174+
scheduled_execution.NewMetricsCollectionPeriodicJob(metricCollectionConfig))
175+
}
164176

165177
// Add the metrics queue
166178
maps.Copy(queues, nonOrgQueues)
167179

168180
// Periodics always contain the per-org tasks retrieved above. Add other, non-organization-scoped periodics below
169181
periodics := append(
170182
orgPeriodics,
171-
[]*river.PeriodicJob{
172-
// Add periodic jobs here
173-
scheduled_execution.NewMetricsCollectionPeriodicJob(),
174-
}...,
183+
globalPeriodics...,
175184
)
176185

177186
riverClient, err = river.NewClient(riverpgxv5.New(pool), &river.Config{
@@ -216,7 +225,9 @@ func RunTaskQueue(apiVersion string) error {
216225
if offloadingConfig.Enabled {
217226
river.AddWorker(workers, adminUc.NewOffloadingWorker())
218227
}
219-
river.AddWorker(workers, uc.NewMetricsCollectionWorker())
228+
if metricCollectionConfig.Enabled {
229+
river.AddWorker(workers, uc.NewMetricsCollectionWorker())
230+
}
220231

221232
if err := riverClient.Start(ctx); err != nil {
222233
utils.LogAndReportSentryError(ctx, err)

infra/metrics_collection.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package infra
2+
3+
import "time"
4+
5+
type MetricCollectionConfig struct {
6+
Enabled bool
7+
JobInterval time.Duration
8+
}

usecases/scheduled_execution/metrics_collection_job.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,23 @@ import (
55
"encoding/json"
66
"time"
77

8+
"github.com/checkmarble/marble-backend/infra"
89
"github.com/checkmarble/marble-backend/models"
910
"github.com/checkmarble/marble-backend/repositories"
1011
"github.com/checkmarble/marble-backend/usecases/executor_factory"
1112
"github.com/checkmarble/marble-backend/utils"
1213
"github.com/riverqueue/river"
1314
)
1415

15-
// const METRIC_COLLECTION_WORKER_INTERVAL = 24 * time.Hour // Run daily
16-
const METRICS_COLLECTION_WORKER_INTERVAL = 10 * time.Second // Run every minute for testing
17-
18-
func NewMetricsCollectionPeriodicJob() *river.PeriodicJob {
16+
func NewMetricsCollectionPeriodicJob(config infra.MetricCollectionConfig) *river.PeriodicJob {
1917
return river.NewPeriodicJob(
20-
river.PeriodicInterval(METRICS_COLLECTION_WORKER_INTERVAL),
18+
river.PeriodicInterval(config.JobInterval),
2119
func() (river.JobArgs, *river.InsertOpts) {
2220
return models.MetricsCollectionArgs{}, &river.InsertOpts{
2321
Queue: "metrics",
2422
UniqueOpts: river.UniqueOpts{
2523
ByQueue: true,
26-
ByPeriod: METRICS_COLLECTION_WORKER_INTERVAL,
24+
ByPeriod: config.JobInterval,
2725
},
2826
}
2927
},

0 commit comments

Comments
 (0)