Skip to content

Commit c2b81c4

Browse files
committed
refactor(metrics): rename MetricsPayload to MetricsCollection and enhance metrics collection structure
1 parent d35d517 commit c2b81c4

File tree

4 files changed

+107
-108
lines changed

4 files changed

+107
-108
lines changed

models/metrics_collection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type MetricData struct {
2424
Frequency MetricCollectionFrequency
2525
}
2626

27-
type MetricsPayload struct {
27+
type MetricsCollection struct {
2828
CollectionID uuid.UUID // Unique ID for this collection run, could be use as idempotency key
2929
Timestamp time.Time
3030
Metrics []MetricData

usecases/metrics_collection/collector.go

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,20 @@ package metrics_collection
22

33
import (
44
"context"
5+
"slices"
56
"time"
67

78
"github.com/checkmarble/marble-backend/models"
9+
"github.com/checkmarble/marble-backend/repositories"
10+
"github.com/checkmarble/marble-backend/usecases/executor_factory"
11+
"github.com/checkmarble/marble-backend/utils"
12+
"github.com/google/uuid"
813
)
914

15+
type OrganizationRepository interface {
16+
AllOrganizations(ctx context.Context, exec repositories.Executor) ([]models.Organization, error)
17+
}
18+
1019
// GlobalCollector is a collector that is not specific to an organization.
1120
// It is used to collect metrics that are not specific to an organization.
1221
// For example, the app version, the number of users
@@ -23,23 +32,97 @@ type Collectors struct {
2332
version string
2433
globalCollectors []GlobalCollector
2534
collectors []Collector
35+
36+
organizationRepository OrganizationRepository
37+
executorFactory executor_factory.ExecutorFactory
2638
}
2739

28-
func (c Collectors) GetGlobalCollectors() []GlobalCollector {
29-
return c.globalCollectors
40+
func (c Collectors) CollectMetrics(ctx context.Context, from time.Time, to time.Time) (models.MetricsCollection, error) {
41+
metrics := make([]models.MetricData, 0)
42+
43+
// Collect global metrics
44+
globalMetrics, err := c.collectGlobalMetrics(ctx, from, to)
45+
if err != nil {
46+
return models.MetricsCollection{}, err
47+
}
48+
metrics = slices.Concat(metrics, globalMetrics)
49+
50+
// Collect organization-specific metrics
51+
orgMetrics, err := c.collectOrganizationMetrics(ctx, from, to)
52+
if err != nil {
53+
return models.MetricsCollection{}, err
54+
}
55+
metrics = slices.Concat(metrics, orgMetrics)
56+
57+
payload := models.MetricsCollection{
58+
CollectionID: uuid.New(),
59+
Timestamp: time.Now(),
60+
Metrics: metrics,
61+
Version: c.version,
62+
}
63+
64+
return payload, nil
3065
}
3166

32-
func (c Collectors) GetCollectors() []Collector {
33-
return c.collectors
67+
// Collects global metrics from all collectors
68+
// If a collector fails, it will log a warning and continue to the next collector (don't fail the whole function)
69+
func (c Collectors) collectGlobalMetrics(ctx context.Context, from time.Time, to time.Time) ([]models.MetricData, error) {
70+
metrics := make([]models.MetricData, 0)
71+
logger := utils.LoggerFromContext(ctx)
72+
73+
for _, collector := range c.globalCollectors {
74+
value, err := collector.Collect(ctx, from, to)
75+
if err != nil {
76+
logger.WarnContext(ctx, "Failed to collect global metrics", "error", err)
77+
continue
78+
}
79+
metrics = slices.Concat(metrics, value)
80+
}
81+
82+
return metrics, nil
83+
}
84+
85+
// Collects organization metrics from all collectors, fetching all organizations from the database first
86+
// If a collector fails, it will log a warning and continue to the next collector (don't fail the whole function)
87+
func (c Collectors) collectOrganizationMetrics(ctx context.Context, from time.Time, to time.Time) ([]models.MetricData, error) {
88+
metrics := make([]models.MetricData, 0)
89+
logger := utils.LoggerFromContext(ctx)
90+
91+
orgs, err := c.getListOfOrganizations(ctx)
92+
if err != nil {
93+
return []models.MetricData{}, err
94+
}
95+
96+
for _, org := range orgs {
97+
for _, collector := range c.collectors {
98+
value, err := collector.Collect(ctx, org.Id, from, to)
99+
if err != nil {
100+
logger.WarnContext(ctx, "Failed to collect organization metrics", "error", err)
101+
continue
102+
}
103+
metrics = slices.Concat(metrics, value)
104+
}
105+
}
106+
107+
return metrics, nil
34108
}
35109

36-
func (c Collectors) GetVersion() string {
37-
return c.version
110+
// Fetches all organizations from the database
111+
// NOTE: Add caching to avoid fetching the same organizations every time (but how can we invalidate the cache?)
112+
func (c Collectors) getListOfOrganizations(ctx context.Context) ([]models.Organization, error) {
113+
orgs, err := c.organizationRepository.AllOrganizations(ctx, c.executorFactory.NewExecutor())
114+
if err != nil {
115+
return []models.Organization{}, err
116+
}
117+
return orgs, nil
38118
}
39119

40120
// Use version to track the version of the collectors, could be used to track changes
41121
// and tell the server which collectors is used by the client
42-
func NewCollectorsV1() Collectors {
122+
func NewCollectorsV1(
123+
executorFactory executor_factory.ExecutorFactory,
124+
organizationRepository OrganizationRepository,
125+
) Collectors {
43126
return Collectors{
44127
version: "v1",
45128
collectors: []Collector{
@@ -48,5 +131,7 @@ func NewCollectorsV1() Collectors {
48131
globalCollectors: []GlobalCollector{
49132
NewStubGlobalCollector(),
50133
},
134+
executorFactory: executorFactory,
135+
organizationRepository: organizationRepository,
51136
}
52137
}

usecases/scheduled_execution/metrics_collection_job.go

Lines changed: 10 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,10 @@ package scheduled_execution
22

33
import (
44
"context"
5-
"slices"
65
"time"
76

87
"github.com/checkmarble/marble-backend/models"
9-
"github.com/checkmarble/marble-backend/repositories"
10-
"github.com/checkmarble/marble-backend/usecases/executor_factory"
11-
"github.com/checkmarble/marble-backend/usecases/metrics_collection"
128
"github.com/checkmarble/marble-backend/utils"
13-
"github.com/google/uuid"
149
"github.com/riverqueue/river"
1510
)
1611

@@ -33,23 +28,21 @@ func NewMetricsCollectionPeriodicJob() *river.PeriodicJob {
3328
)
3429
}
3530

31+
type MetricsCollectionUsecase interface {
32+
CollectMetrics(ctx context.Context, from time.Time, to time.Time) (models.MetricsCollection, error)
33+
}
34+
3635
type MetricCollectionWorker struct {
3736
river.WorkerDefaults[models.MetricsCollectionArgs]
3837

39-
executorFactory executor_factory.ExecutorFactory
40-
organizationRepository repositories.OrganizationRepository
41-
collectors metrics_collection.Collectors
38+
collectors MetricsCollectionUsecase
4239
}
4340

4441
func NewMetricCollectionWorker(
45-
executorFactory executor_factory.ExecutorFactory,
46-
organizationRepository repositories.OrganizationRepository,
47-
collectors metrics_collection.Collectors,
42+
collectors MetricsCollectionUsecase,
4843
) MetricCollectionWorker {
4944
return MetricCollectionWorker{
50-
executorFactory: executorFactory,
51-
organizationRepository: organizationRepository,
52-
collectors: collectors,
45+
collectors: collectors,
5346
}
5447
}
5548

@@ -66,100 +59,20 @@ func (w MetricCollectionWorker) Work(ctx context.Context, job *river.Job[models.
6659
from := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
6760

6861
// Create the metric collection usecase
69-
metrics, err := w.collectMetrics(ctx, from, now)
62+
metricsCollection, err := w.collectors.CollectMetrics(ctx, from, now)
7063
if err != nil {
7164
logger.ErrorContext(ctx, "Failed to collect metrics", "error", err)
7265
return err
7366
}
7467

75-
logger.DebugContext(ctx, "Collected metrics", "metrics", metrics)
68+
logger.DebugContext(ctx, "Collected metrics", "metrics", metricsCollection)
7669

7770
// TODO: Update the watermarks with now value
7871
logger.DebugContext(ctx, "Updating watermarks", "new_timestamp", now)
7972

8073
// TODO: Store or send the metrics somewhere
8174
// For now, just log the number of metrics collected
82-
logger.DebugContext(ctx, "Metric collection completed", "metrics_count", len(metrics.Metrics))
75+
logger.DebugContext(ctx, "Metric collection completed", "metrics_count", len(metricsCollection.Metrics))
8376

8477
return nil
8578
}
86-
87-
func (w *MetricCollectionWorker) collectMetrics(ctx context.Context, from time.Time, to time.Time) (models.MetricsPayload, error) {
88-
metrics := make([]models.MetricData, 0)
89-
90-
// Collect global metrics
91-
globalMetrics, err := w.collectGlobalMetrics(ctx, from, to)
92-
if err != nil {
93-
return models.MetricsPayload{}, err
94-
}
95-
metrics = slices.Concat(metrics, globalMetrics)
96-
97-
// Collect organization-specific metrics
98-
orgMetrics, err := w.collectOrganizationMetrics(ctx, from, to)
99-
if err != nil {
100-
return models.MetricsPayload{}, err
101-
}
102-
metrics = slices.Concat(metrics, orgMetrics)
103-
104-
payload := models.MetricsPayload{
105-
CollectionID: uuid.New(),
106-
Timestamp: time.Now(),
107-
Metrics: metrics,
108-
Version: w.collectors.GetVersion(),
109-
}
110-
111-
return payload, nil
112-
}
113-
114-
// Collects global metrics from all collectors
115-
// If a collector fails, it will log a warning and continue to the next collector (don't fail the whole function)
116-
func (w *MetricCollectionWorker) collectGlobalMetrics(ctx context.Context, from time.Time, to time.Time) ([]models.MetricData, error) {
117-
metrics := make([]models.MetricData, 0)
118-
logger := utils.LoggerFromContext(ctx)
119-
120-
for _, collector := range w.collectors.GetGlobalCollectors() {
121-
value, err := collector.Collect(ctx, from, to)
122-
if err != nil {
123-
logger.WarnContext(ctx, "Failed to collect global metrics", "error", err)
124-
continue
125-
}
126-
metrics = slices.Concat(metrics, value)
127-
}
128-
129-
return metrics, nil
130-
}
131-
132-
// Collects organization metrics from all collectors, fetching all organizations from the database first
133-
// If a collector fails, it will log a warning and continue to the next collector (don't fail the whole function)
134-
func (w *MetricCollectionWorker) collectOrganizationMetrics(ctx context.Context, from time.Time, to time.Time) ([]models.MetricData, error) {
135-
metrics := make([]models.MetricData, 0)
136-
logger := utils.LoggerFromContext(ctx)
137-
138-
orgs, err := w.getListOfOrganizations(ctx)
139-
if err != nil {
140-
return []models.MetricData{}, err
141-
}
142-
143-
for _, org := range orgs {
144-
for _, collector := range w.collectors.GetCollectors() {
145-
value, err := collector.Collect(ctx, org.Id, from, to)
146-
if err != nil {
147-
logger.WarnContext(ctx, "Failed to collect organization metrics", "error", err)
148-
continue
149-
}
150-
metrics = slices.Concat(metrics, value)
151-
}
152-
}
153-
154-
return metrics, nil
155-
}
156-
157-
// Fetches all organizations from the database
158-
// NOTE: Add caching to avoid fetching the same organizations every time (but how can we invalidate the cache?)
159-
func (w *MetricCollectionWorker) getListOfOrganizations(ctx context.Context) ([]models.Organization, error) {
160-
orgs, err := w.organizationRepository.AllOrganizations(ctx, w.executorFactory.NewExecutor())
161-
if err != nil {
162-
return []models.Organization{}, err
163-
}
164-
return orgs, nil
165-
}

usecases/usecases.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,9 @@ func (usecases *Usecases) NewTaskQueueWorker(riverClient *river.Client[pgx.Tx])
346346

347347
func (usecases *Usecases) NewMetricsCollectionWorker() scheduled_execution.MetricCollectionWorker {
348348
return scheduled_execution.NewMetricCollectionWorker(
349-
usecases.NewExecutorFactory(),
350-
&usecases.Repositories.MarbleDbRepository,
351-
metrics_collection.NewCollectorsV1(),
349+
metrics_collection.NewCollectorsV1(
350+
usecases.NewExecutorFactory(),
351+
&usecases.Repositories.MarbleDbRepository,
352+
),
352353
)
353354
}

0 commit comments

Comments
 (0)