Skip to content

Commit 0650834

Browse files
committed
feat(task_queue): implement queue whitelist for non-org metrics and update task queue worker initialization
1 parent 2b1c4c4 commit 0650834

File tree

4 files changed

+22
-13
lines changed

4 files changed

+22
-13
lines changed

cmd/worker.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88
"os"
99
"os/signal"
10+
"slices"
1011
"syscall"
1112
"time"
1213

@@ -156,8 +157,13 @@ func RunTaskQueue(apiVersion string) error {
156157
return err
157158
}
158159

159-
// Add the global queue
160-
maps.Copy(queues, usecases.QueuesGlobal())
160+
// For non-org queues
161+
nonOrgQueues := make(map[string]river.QueueConfig)
162+
maps.Copy(nonOrgQueues, usecases.QueueMetrics())
163+
queueWhitelist := slices.Collect(maps.Keys(nonOrgQueues))
164+
165+
// Add the metrics queue
166+
maps.Copy(queues, nonOrgQueues)
161167

162168
// Periodics always contain the per-org tasks retrieved above. Add other, non-organization-scoped periodics below
163169
periodics := append(
@@ -233,7 +239,7 @@ func RunTaskQueue(apiVersion string) error {
233239
logger.InfoContext(ctx, "starting worker", slog.String("version", apiVersion))
234240

235241
// Asynchronously keep the task queue workers up to date with the orgs in the database
236-
taskQueueWorker := uc.NewTaskQueueWorker(riverClient)
242+
taskQueueWorker := uc.NewTaskQueueWorker(riverClient, queueWhitelist)
237243
go taskQueueWorker.RefreshQueuesFromOrgIds(ctx)
238244

239245
// Start the cron jobs using the old entrypoint.

usecases/scheduled_execution/metrics_collection_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func NewMetricsCollectionPeriodicJob() *river.PeriodicJob {
1717
river.PeriodicInterval(METRICS_COLLECTION_WORKER_INTERVAL),
1818
func() (river.JobArgs, *river.InsertOpts) {
1919
return models.MetricsCollectionArgs{}, &river.InsertOpts{
20-
Queue: "global",
20+
Queue: "metrics",
2121
UniqueOpts: river.UniqueOpts{
2222
ByQueue: true,
2323
ByPeriod: METRICS_COLLECTION_WORKER_INTERVAL,

usecases/task_queue.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package usecases
33
import (
44
"context"
55
"fmt"
6+
"slices"
67
"sync"
78
"time"
89

@@ -19,26 +20,27 @@ import (
1920
)
2021

2122
const (
22-
numberWorkersPerQueue = 5
23-
globalQueueName = "global"
24-
numberWorkersForGlobalQueue = 1
23+
numberWorkersPerQueue = 5
2524
)
2625

2726
type TaskQueueWorker struct {
2827
executorFactory executor_factory.ExecutorFactory
2928
orgRepository repositories.OrganizationRepository
29+
queueWhitelist []string
3030
riverClient *river.Client[pgx.Tx]
3131
mu *sync.Mutex
3232
}
3333

3434
func NewTaskQueueWorker(
3535
executorFactory executor_factory.ExecutorFactory,
3636
orgRepository repositories.OrganizationRepository,
37+
queueWhitelist []string,
3738
riverClient *river.Client[pgx.Tx],
3839
) *TaskQueueWorker {
3940
return &TaskQueueWorker{
4041
executorFactory: executorFactory,
4142
orgRepository: orgRepository,
43+
queueWhitelist: queueWhitelist,
4244
riverClient: riverClient,
4345
mu: &sync.Mutex{},
4446
}
@@ -138,8 +140,8 @@ func (w *TaskQueueWorker) removeQueuesFromMissingOrgs(ctx context.Context,
138140
continue
139141
}
140142

141-
// Ignore global queue
142-
if q.Name == globalQueueName {
143+
// Ignore whitelisted queues
144+
if slices.Contains(w.queueWhitelist, q.Name) {
143145
continue
144146
}
145147

@@ -186,10 +188,10 @@ func QueuesFromOrgs(ctx context.Context, orgsRepo repositories.OrganizationRepos
186188
return queues, periodics, nil
187189
}
188190

189-
func QueuesGlobal() map[string]river.QueueConfig {
191+
func QueueMetrics() map[string]river.QueueConfig {
190192
queues := make(map[string]river.QueueConfig, 1)
191-
queues[globalQueueName] = river.QueueConfig{
192-
MaxWorkers: numberWorkersForGlobalQueue,
193+
queues["metrics"] = river.QueueConfig{
194+
MaxWorkers: 1,
193195
}
194196
return queues
195197
}

usecases/usecases.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,10 +336,11 @@ func (usecases *Usecases) NewLicenseUsecase() PublicLicenseUseCase {
336336
)
337337
}
338338

339-
func (usecases *Usecases) NewTaskQueueWorker(riverClient *river.Client[pgx.Tx]) *TaskQueueWorker {
339+
func (usecases *Usecases) NewTaskQueueWorker(riverClient *river.Client[pgx.Tx], queueWhitelist []string) *TaskQueueWorker {
340340
return NewTaskQueueWorker(
341341
usecases.NewExecutorFactory(),
342342
&usecases.Repositories.MarbleDbRepository,
343+
queueWhitelist,
343344
riverClient,
344345
)
345346
}

0 commit comments

Comments
 (0)