Skip to content

Commit d4eb4d5

Browse files
committed
Add maximum bound for attempted_by array
As described by #972, it may be possible for huge numbers of snoozes to blow out a job row's `attempted_by` array as a job is locked over and over again. Multiplied by many jobs, this can produce vast quantities of data that gets sent over the network. Here, put in a ratchet on `attempted_by` so that if the array becomes larger than 100 elements, we knock the oldest one off in favor of the most recent client and the most fresh 99. Unfortunately the implementation isn't particularly clean in either Postgres or SQLite. In Postgres it would've been cleaner if we'd had the `attempted_by` in reverse order so the new client was on front because the built-in array functions would be friendlier to that layout, but because it's not, we have to do something a little hackier involving a `CASE` statement instead. SQLite is even worse. SQLite has no array functions at all, which doesn't help, but moreover every strategy I tried ended up blocked by a sqlc SQLite bug, so after trying everything I could think of, I ended up having to extract the piece that does the array truncation into a SQL template string to get this over the line. This could be removed in the future if any one of a number of outstanding sqlc bugs are fixed (e.g. [1]). [1] sqlc-dev/sqlc#3610
1 parent 24a8c2f commit d4eb4d5

File tree

18 files changed

+272
-92
lines changed

18 files changed

+272
-92
lines changed

.golangci.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ linters:
4444
- desc: Don't use `riverinternaltest` package outside of test environments.
4545
pkg: github.com/riverqueue/river/internal/riverinternaltest
4646

47+
exhaustive:
48+
default-signifies-exhaustive: true
49+
4750
forbidigo:
4851
forbid:
4952
- msg: Use `require` variants instead.

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2323
- The reindexer maintenance service now reindexes all `river_job` indexes, including its primary key. This is expected to help in situations where the jobs table has in the past expanded to a very large size (which makes most indexes larger), is now a much more modest size, but has left the indexes in their expanded state. [PR #963](https://github.com/riverqueue/river/pull/963).
2424
- The River CLI now accepts a `--target-version` of 0 with `river migrate-down` to run all down migrations and remove all River tables (previously, -1 was used for this; -1 still works, but now 0 also works). [PR #966](https://github.com/riverqueue/river/pull/966).
2525
- **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970).
26+
- Add maximum bound to each job's `attempted_by` array so that in degenerate cases where a job is locked many, many times (say it's snoozed hundreds of times), it doesn't grow to unlimited bounds. [PR #974](https://github.com/riverqueue/river/pull/974).
2627

2728
### Fixed
2829

internal/jobexecutor/job_executor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,9 @@ func TestJobExecutor_Execute(t *testing.T) {
159159

160160
// Fetch the job to make sure it's marked as running:
161161
jobs, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
162-
Max: 1,
163-
Now: ptrutil.Ptr(now),
164-
Queue: rivercommon.QueueDefault,
162+
MaxToLock: 1,
163+
Now: ptrutil.Ptr(now),
164+
Queue: rivercommon.QueueDefault,
165165
})
166166
require.NoError(t, err)
167167

internal/maintenance/job_scheduler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,10 @@ func TestJobScheduler(t *testing.T) {
329329
addJob := func(queue string, fromNow time.Duration, state rivertype.JobState) {
330330
t.Helper()
331331
var finalizedAt *time.Time
332-
switch state { //nolint:exhaustive
332+
switch state {
333333
case rivertype.JobStateCompleted, rivertype.JobStateCancelled, rivertype.JobStateDiscarded:
334334
finalizedAt = ptrutil.Ptr(now.Add(fromNow))
335+
default:
335336
}
336337
testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
337338
FinalizedAt: finalizedAt,

job_list_params.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,6 @@ func (p *JobListParams) toDBParams() (*dblist.JobListParams, error) {
234234
if p.sortField == JobListOrderByFinalizedAt {
235235
currentNonFinalizedStates := make([]rivertype.JobState, 0, len(p.states))
236236
for _, state := range p.states {
237-
//nolint:exhaustive
238237
switch state {
239238
case rivertype.JobStateCancelled, rivertype.JobStateCompleted, rivertype.JobStateDiscarded:
240239
default:

producer.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -742,13 +742,18 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
742742
// back to the queue.
743743
ctx := context.WithoutCancel(workCtx)
744744

745+
// Maximum size of the `attempted_by` array on each job row. This maximum is
746+
// rarely hit, but exists to protect against degenerate cases.
747+
const maxAttemptedBy = 100
748+
745749
jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
746-
ClientID: p.config.ClientID,
747-
Max: count,
748-
Now: p.Time.NowUTCOrNil(),
749-
Queue: p.config.Queue,
750-
ProducerID: p.id.Load(),
751-
Schema: p.config.Schema,
750+
ClientID: p.config.ClientID,
751+
MaxAttemptedBy: maxAttemptedBy,
752+
MaxToLock: count,
753+
Now: p.Time.NowUTCOrNil(),
754+
Queue: p.config.Queue,
755+
ProducerID: p.id.Load(),
756+
Schema: p.config.Schema,
752757
})
753758
if err != nil {
754759
p.Logger.Error(p.Name+": Error fetching jobs", slog.String("err", err.Error()), slog.String("queue", p.config.Queue))

riverdriver/river_driver_interface.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -365,12 +365,13 @@ type JobDeleteManyParams struct {
365365
}
366366

367367
type JobGetAvailableParams struct {
368-
ClientID string
369-
Max int
370-
Now *time.Time
371-
ProducerID int64
372-
Queue string
373-
Schema string
368+
ClientID string
369+
MaxAttemptedBy int
370+
MaxToLock int
371+
Now *time.Time
372+
ProducerID int64
373+
Queue string
374+
Schema string
374375
}
375376

376377
type JobGetByIDParams struct {

riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Lines changed: 21 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverdatabasesql/river_database_sql_driver.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,11 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel
241241

242242
func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
243243
jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{
244-
AttemptedBy: params.ClientID,
245-
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
246-
Now: params.Now,
247-
Queue: params.Queue,
244+
AttemptedBy: params.ClientID,
245+
MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec
246+
MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec
247+
Now: params.Now,
248+
Queue: params.Queue,
248249
})
249250
if err != nil {
250251
return nil, interpretError(err)

riverdriver/riverdrivertest/driver_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
399399

400400
for range b.N {
401401
if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
402-
ClientID: clientID,
403-
Max: 100,
404-
Queue: river.QueueDefault,
402+
ClientID: clientID,
403+
MaxToLock: 100,
404+
Queue: river.QueueDefault,
405405
}); err != nil {
406406
b.Fatal(err)
407407
}
@@ -425,9 +425,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
425425
b.RunParallel(func(pb *testing.PB) {
426426
for pb.Next() {
427427
if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
428-
ClientID: clientID,
429-
Max: 100,
430-
Queue: river.QueueDefault,
428+
ClientID: clientID,
429+
MaxToLock: 100,
430+
Queue: river.QueueDefault,
431431
}); err != nil {
432432
b.Fatal(err)
433433
}

0 commit comments

Comments
 (0)