Skip to content

Commit 2c1f4f2

Browse files
committed
feat(metrics_ingestion): implement SendMetrics function to send metrics to BigQuery
1 parent 9d86b9a commit 2c1f4f2

File tree

6 files changed

+105
-36
lines changed

6 files changed

+105
-36
lines changed

models/metrics_collection.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package models
33
import (
44
"time"
55

6+
"cloud.google.com/go/bigquery"
67
"github.com/google/uuid"
78
)
89

@@ -34,6 +35,73 @@ type MetricsCollection struct {
3435
LicenseKey *string
3536
}
3637

38+
// Bigquery schema for metrics
39+
type MetricEventRow struct {
40+
StartTime time.Time `bigquery:"start_time"`
41+
EndTime time.Time `bigquery:"end_time"`
42+
DeploymentID uuid.UUID `bigquery:"deployment_id"`
43+
LicenseKey bigquery.NullString `bigquery:"license_key"`
44+
OrgID bigquery.NullString `bigquery:"org_id"`
45+
EventType string `bigquery:"event_type"`
46+
Counter bigquery.NullFloat64 `bigquery:"counter"`
47+
Gauge bigquery.NullFloat64 `bigquery:"gauge"`
48+
Text bigquery.NullString `bigquery:"text"`
49+
}
50+
51+
func AdaptMetricsCollection(metricsCollection MetricsCollection) []*MetricEventRow {
52+
metricEventRows := make([]*MetricEventRow, 0, len(metricsCollection.Metrics))
53+
54+
licenseKey := bigquery.NullString{}
55+
if metricsCollection.LicenseKey != nil {
56+
licenseKey.StringVal = *metricsCollection.LicenseKey
57+
licenseKey.Valid = true
58+
}
59+
60+
for _, metric := range metricsCollection.Metrics {
61+
orgID := bigquery.NullString{}
62+
if metric.OrganizationID != nil {
63+
orgID.StringVal = *metric.OrganizationID
64+
orgID.Valid = true
65+
}
66+
67+
counter := bigquery.NullFloat64{}
68+
if metric.Numeric != nil {
69+
counter.Float64 = *metric.Numeric
70+
counter.Valid = true
71+
}
72+
73+
text := bigquery.NullString{}
74+
if metric.Text != nil {
75+
text.StringVal = *metric.Text
76+
text.Valid = true
77+
}
78+
79+
startTime := metricsCollection.Timestamp
80+
if metric.From != nil {
81+
startTime = *metric.From
82+
}
83+
84+
endTime := metricsCollection.Timestamp
85+
if metric.To != nil {
86+
endTime = *metric.To
87+
}
88+
89+
metricEventRows = append(metricEventRows, &MetricEventRow{
90+
StartTime: startTime,
91+
EndTime: endTime,
92+
DeploymentID: metricsCollection.DeploymentID,
93+
LicenseKey: licenseKey,
94+
OrgID: orgID,
95+
EventType: metric.Name,
96+
Counter: counter,
97+
Gauge: bigquery.NullFloat64{},
98+
Text: text,
99+
})
100+
}
101+
102+
return metricEventRows
103+
}
104+
37105
func NewGlobalMetric(name string, numeric *float64, text *string, from, to *time.Time, frequency MetricCollectionFrequency) MetricData {
38106
return MetricData{
39107
Name: name,

repositories/metrics_ingestion_repository.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,25 @@ func (repo MetricsIngestionRepository) TestConnection(ctx context.Context) error
4747
logger.InfoContext(ctx, "BigQuery connection test successful", "result", row[0])
4848
return nil
4949
}
50+
51+
func (repo MetricsIngestionRepository) SendMetrics(ctx context.Context, metrics models.MetricsCollection) error {
52+
logger := utils.LoggerFromContext(ctx)
53+
54+
logger.DebugContext(ctx, "Sending metrics to BigQuery",
55+
"collection_id", metrics.CollectionID,
56+
"metrics_count", len(metrics.Metrics),
57+
)
58+
59+
table := repo.bqClient.Client.Dataset("metrics").Table("events")
60+
inserter := table.Inserter()
61+
62+
metricEventRows := models.AdaptMetricsCollection(metrics)
63+
64+
err := inserter.Put(ctx, metricEventRows)
65+
if err != nil {
66+
logger.ErrorContext(ctx, "Failed to send metrics to BigQuery", "error", err)
67+
return fmt.Errorf("failed to send metrics to BigQuery: %w", err)
68+
}
69+
70+
return nil
71+
}

usecases/health_usecase.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ type HealthUsecase struct {
2020
executorFactory executor_factory.ExecutorFactory
2121
healthRepository healthRepository
2222

23-
openSanctionsRepository OpenSanctionsHealthRepository
24-
hasOpensanctionsSetup bool
25-
metricsIngestionRepository MetricsIngestionRepository
23+
openSanctionsRepository OpenSanctionsHealthRepository
24+
hasOpensanctionsSetup bool
2625
}
2726

2827
func (u *HealthUsecase) GetHealthStatus(ctx context.Context) models.HealthStatus {
@@ -43,15 +42,4 @@ func (u *HealthUsecase) GetHealthStatus(ctx context.Context) models.HealthStatus
4342
Status: ok && err == nil,
4443
})
4544
}
46-
47-
// Check BigQuery health
48-
err = u.metricsIngestionRepository.TestConnection(ctx)
49-
statuses = append(statuses, models.HealthItemStatus{
50-
Name: models.BigQueryHealthItemName,
51-
Status: err == nil,
52-
})
53-
54-
return models.HealthStatus{
55-
Statuses: statuses,
56-
}
5745
}

usecases/metrics_ingestion.go

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ package usecases
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/checkmarble/marble-backend/models"
78
"github.com/checkmarble/marble-backend/utils"
89
)
910

1011
type MetricsIngestionRepository interface {
11-
TestConnection(ctx context.Context) error
12+
SendMetrics(ctx context.Context, collection models.MetricsCollection) error
1213
}
1314

1415
type MetricsIngestionUsecase struct {
@@ -21,24 +22,14 @@ func NewMetricsIngestionUsecase(repository MetricsIngestionRepository) MetricsIn
2122
}
2223
}
2324

24-
func (u *MetricsIngestionUsecase) IngestMetrics(ctx context.Context, metrics models.MetricsCollection) error {
25-
// Ingest the collection
26-
err := u.ingestCollection(ctx, metrics)
27-
if err != nil {
28-
return err
29-
}
30-
31-
return nil
32-
}
33-
34-
func (u *MetricsIngestionUsecase) ingestCollection(ctx context.Context, collection models.MetricsCollection) error {
25+
func (u *MetricsIngestionUsecase) IngestMetrics(ctx context.Context, collection models.MetricsCollection) error {
3526
logger := utils.LoggerFromContext(ctx)
36-
logger.DebugContext(ctx, "Ingesting collection", "collection", collection)
27+
logger.DebugContext(ctx, "Sending metrics to BigQuery", "collection", collection)
3728

38-
// TODO: Implement the ingestion logic
39-
err := u.repository.TestConnection(ctx)
29+
err := u.repository.SendMetrics(ctx, collection)
4030
if err != nil {
41-
return err
31+
logger.ErrorContext(ctx, "Failed to send metrics to BigQuery", "error", err)
32+
return fmt.Errorf("failed to send metrics to BigQuery: %w", err)
4233
}
4334

4435
return nil

usecases/scheduled_execution/metrics_collection_job.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ func NewMetricsCollectionPeriodicJob(config infra.MetricCollectionConfig) *river
3030
},
3131
}
3232
},
33-
&river.PeriodicJobOpts{RunOnStart: false},
33+
// TODO: RunOnstart should be false for production
34+
&river.PeriodicJobOpts{RunOnStart: true},
3435
)
3536
}
3637

usecases/usecases.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,10 @@ func (usecases *Usecases) NewLivenessUsecase() LivenessUsecase {
207207

208208
func (usecases *Usecases) NewHealthUsecase() HealthUsecase {
209209
return HealthUsecase{
210-
executorFactory: usecases.NewExecutorFactory(),
211-
healthRepository: &usecases.Repositories.MarbleDbRepository,
212-
hasOpensanctionsSetup: usecases.hasOpensanctionsSetup,
213-
openSanctionsRepository: &usecases.Repositories.OpenSanctionsRepository,
214-
metricsIngestionRepository: usecases.Repositories.MetricsIngestionRepository,
210+
executorFactory: usecases.NewExecutorFactory(),
211+
healthRepository: &usecases.Repositories.MarbleDbRepository,
212+
hasOpensanctionsSetup: usecases.hasOpensanctionsSetup,
213+
openSanctionsRepository: &usecases.Repositories.OpenSanctionsRepository,
215214
}
216215
}
217216

0 commit comments

Comments
 (0)