Skip to content

Commit 55a1f73

Browse files
committed
Add maximum bound for attempted_by array
As described by #972, it may be possible for huge numbers of snoozes to blow out a job row's `attempted_by` array as a job is locked over and over again. Multiplied by many jobs, this can produce vast quantities of data that gets sent over the network. Here, put in a ratchet on `attempted_by` so that if the array becomes larger than 100 elements, we knock the oldest one off in favor of the most recent client and the most fresh 99. Unfortunately the implementation isn't particularly clean in either Postgres or SQLite. In Postgres it would've been cleaner if we'd had the `attempted_by` in reverse order so the new client was on front because the built-in array functions would be friendlier to that layout, but because it's not, we have to do something a little hackier involving a `CASE` statement instead. SQLite is even worse. SQLite has no array functions at all, which doesn't help, but moreover every strategy I tried ended up blocked by a sqlc SQLite bug, so after trying everything I could think of, I ended up having to extract the piece that does the array truncation into a SQL template string to get this over the line. This could be removed in the future if any one of a number of outstanding sqlc bugs are fixed (e.g. [1]). [1] sqlc-dev/sqlc#3610
1 parent 24a8c2f commit 55a1f73

File tree

15 files changed

+269
-90
lines changed

15 files changed

+269
-90
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2323
- The reindexer maintenance service now reindexes all `river_job` indexes, including its primary key. This is expected to help in situations where the jobs table has in the past expanded to a very large size (which makes most indexes larger), is now a much more modest size, but has left the indexes in their expanded state. [PR #963](https://github.com/riverqueue/river/pull/963).
2424
- The River CLI now accepts a `--target-version` of 0 with `river migrate-down` to run all down migrations and remove all River tables (previously, -1 was used for this; -1 still works, but now 0 also works). [PR #966](https://github.com/riverqueue/river/pull/966).
2525
- **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970).
26+
- Add maximum bound to each job's `attempted_by` array so that in degenerate cases where a job is locked many, many times (say it's snoozed hundreds of times), it doesn't grow to unlimited bounds. [PR #974](https://github.com/riverqueue/river/pull/974).
2627

2728
### Fixed
2829

internal/jobexecutor/job_executor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,9 @@ func TestJobExecutor_Execute(t *testing.T) {
159159

160160
// Fetch the job to make sure it's marked as running:
161161
jobs, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
162-
Max: 1,
163-
Now: ptrutil.Ptr(now),
164-
Queue: rivercommon.QueueDefault,
162+
MaxToLock: 1,
163+
Now: ptrutil.Ptr(now),
164+
Queue: rivercommon.QueueDefault,
165165
})
166166
require.NoError(t, err)
167167

producer.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -742,13 +742,18 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
742742
// back to the queue.
743743
ctx := context.WithoutCancel(workCtx)
744744

745+
// Maximum size of the `attempted_by` array on each job row. This maximum is
746+
// rarely hit, but exists to protect against degenerate cases.
747+
const maxAttemptedBy = 100
748+
745749
jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
746-
ClientID: p.config.ClientID,
747-
Max: count,
748-
Now: p.Time.NowUTCOrNil(),
749-
Queue: p.config.Queue,
750-
ProducerID: p.id.Load(),
751-
Schema: p.config.Schema,
750+
ClientID: p.config.ClientID,
751+
MaxAttemptedBy: maxAttemptedBy,
752+
MaxToLock: count,
753+
Now: p.Time.NowUTCOrNil(),
754+
Queue: p.config.Queue,
755+
ProducerID: p.id.Load(),
756+
Schema: p.config.Schema,
752757
})
753758
if err != nil {
754759
p.Logger.Error(p.Name+": Error fetching jobs", slog.String("err", err.Error()), slog.String("queue", p.config.Queue))

riverdriver/river_driver_interface.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -365,12 +365,13 @@ type JobDeleteManyParams struct {
365365
}
366366

367367
type JobGetAvailableParams struct {
368-
ClientID string
369-
Max int
370-
Now *time.Time
371-
ProducerID int64
372-
Queue string
373-
Schema string
368+
ClientID string
369+
MaxAttemptedBy int
370+
MaxToLock int
371+
Now *time.Time
372+
ProducerID int64
373+
Queue string
374+
Schema string
374375
}
375376

376377
type JobGetByIDParams struct {

riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Lines changed: 21 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverdatabasesql/river_database_sql_driver.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,11 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel
241241

242242
func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
243243
jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{
244-
AttemptedBy: params.ClientID,
245-
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
246-
Now: params.Now,
247-
Queue: params.Queue,
244+
AttemptedBy: params.ClientID,
245+
MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec
246+
MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec
247+
Now: params.Now,
248+
Queue: params.Queue,
248249
})
249250
if err != nil {
250251
return nil, interpretError(err)

riverdriver/riverdrivertest/driver_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
399399

400400
for range b.N {
401401
if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
402-
ClientID: clientID,
403-
Max: 100,
404-
Queue: river.QueueDefault,
402+
ClientID: clientID,
403+
MaxToLock: 100,
404+
Queue: river.QueueDefault,
405405
}); err != nil {
406406
b.Fatal(err)
407407
}
@@ -425,9 +425,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
425425
b.RunParallel(func(pb *testing.PB) {
426426
for pb.Next() {
427427
if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
428-
ClientID: clientID,
429-
Max: 100,
430-
Queue: river.QueueDefault,
428+
ClientID: clientID,
429+
MaxToLock: 100,
430+
Queue: river.QueueDefault,
431431
}); err != nil {
432432
b.Fatal(err)
433433
}

riverdriver/riverdrivertest/riverdrivertest.go

Lines changed: 103 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
10041004
t.Run("JobGetAvailable", func(t *testing.T) {
10051005
t.Parallel()
10061006

1007+
const (
1008+
maxAttemptedBy = 10
1009+
maxToLock = 100
1010+
)
1011+
10071012
t.Run("Success", func(t *testing.T) {
10081013
t.Parallel()
10091014

@@ -1012,9 +1017,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
10121017
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{})
10131018

10141019
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1015-
ClientID: clientID,
1016-
Max: 100,
1017-
Queue: rivercommon.QueueDefault,
1020+
ClientID: clientID,
1021+
MaxAttemptedBy: maxAttemptedBy,
1022+
MaxToLock: maxToLock,
1023+
Queue: rivercommon.QueueDefault,
10181024
})
10191025
require.NoError(t, err)
10201026
require.Len(t, jobRows, 1)
@@ -1033,9 +1039,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
10331039

10341040
// Two rows inserted but only one found because of the added limit.
10351041
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1036-
ClientID: clientID,
1037-
Max: 1,
1038-
Queue: rivercommon.QueueDefault,
1042+
ClientID: clientID,
1043+
MaxAttemptedBy: maxAttemptedBy,
1044+
MaxToLock: 1,
1045+
Queue: rivercommon.QueueDefault,
10391046
})
10401047
require.NoError(t, err)
10411048
require.Len(t, jobRows, 1)
@@ -1052,9 +1059,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
10521059

10531060
// Job is in a non-default queue so it's not found.
10541061
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1055-
ClientID: clientID,
1056-
Max: 100,
1057-
Queue: rivercommon.QueueDefault,
1062+
ClientID: clientID,
1063+
MaxAttemptedBy: maxAttemptedBy,
1064+
MaxToLock: maxToLock,
1065+
Queue: rivercommon.QueueDefault,
10581066
})
10591067
require.NoError(t, err)
10601068
require.Empty(t, jobRows)
@@ -1073,10 +1081,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
10731081

10741082
// Job is scheduled a while from now so it's not found.
10751083
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1076-
ClientID: clientID,
1077-
Max: 100,
1078-
Now: &now,
1079-
Queue: rivercommon.QueueDefault,
1084+
ClientID: clientID,
1085+
MaxAttemptedBy: maxAttemptedBy,
1086+
MaxToLock: maxToLock,
1087+
Now: &now,
1088+
Queue: rivercommon.QueueDefault,
10801089
})
10811090
require.NoError(t, err)
10821091
require.Empty(t, jobRows)
@@ -1098,10 +1107,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
10981107
})
10991108

11001109
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1101-
ClientID: clientID,
1102-
Max: 100,
1103-
Now: ptrutil.Ptr(now),
1104-
Queue: rivercommon.QueueDefault,
1110+
ClientID: clientID,
1111+
MaxAttemptedBy: maxAttemptedBy,
1112+
MaxToLock: maxToLock,
1113+
Now: ptrutil.Ptr(now),
1114+
Queue: rivercommon.QueueDefault,
11051115
})
11061116
require.NoError(t, err)
11071117
require.Len(t, jobRows, 1)
@@ -1121,9 +1131,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
11211131
}
11221132

11231133
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1124-
ClientID: clientID,
1125-
Max: 2,
1126-
Queue: rivercommon.QueueDefault,
1134+
ClientID: clientID,
1135+
MaxAttemptedBy: maxAttemptedBy,
1136+
MaxToLock: 2,
1137+
Queue: rivercommon.QueueDefault,
11271138
})
11281139
require.NoError(t, err)
11291140
require.Len(t, jobRows, 2, "expected to fetch exactly 2 jobs")
@@ -1140,15 +1151,84 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
11401151

11411152
// Should fetch the one remaining job on the next attempt:
11421153
jobRows, err = exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1143-
ClientID: clientID,
1144-
Max: 1,
1145-
Queue: rivercommon.QueueDefault,
1154+
ClientID: clientID,
1155+
MaxAttemptedBy: maxAttemptedBy,
1156+
MaxToLock: 1,
1157+
Queue: rivercommon.QueueDefault,
11461158
})
11471159
require.NoError(t, err)
11481160
require.NoError(t, err)
11491161
require.Len(t, jobRows, 1, "expected to fetch exactly 1 job")
11501162
require.Equal(t, 3, jobRows[0].Priority, "expected final job to have priority 3")
11511163
})
1164+
1165+
t.Run("AttemptedByAtMaxTruncated", func(t *testing.T) {
1166+
t.Parallel()
1167+
1168+
exec, _ := setup(ctx, t)
1169+
1170+
attemptedBy := make([]string, maxAttemptedBy)
1171+
for i := range maxAttemptedBy {
1172+
attemptedBy[i] = "attempt_" + strconv.Itoa(i)
1173+
}
1174+
1175+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1176+
AttemptedBy: attemptedBy,
1177+
})
1178+
1179+
// Job is in a non-default queue so it's not found.
1180+
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1181+
ClientID: clientID,
1182+
MaxAttemptedBy: maxAttemptedBy,
1183+
MaxToLock: maxToLock,
1184+
Queue: rivercommon.QueueDefault,
1185+
})
1186+
require.NoError(t, err)
1187+
require.Len(t, jobRows, 1)
1188+
1189+
jobRow := jobRows[0]
1190+
require.Equal(t, append(
1191+
attemptedBy[1:],
1192+
clientID,
1193+
), jobRow.AttemptedBy)
1194+
require.Len(t, jobRow.AttemptedBy, maxAttemptedBy)
1195+
})
1196+
1197+
// Almost identical to the above, but tests that there are more existing
1198+
// `attempted_by` elements than the maximum allowed. There's a fine bug
1199+
// around use of > versus >= in the query's conditional, so make sure to
1200+
// capture both cases to make sure they work.
1201+
t.Run("AttemptedByOverMaxTruncated", func(t *testing.T) {
1202+
t.Parallel()
1203+
1204+
exec, _ := setup(ctx, t)
1205+
1206+
attemptedBy := make([]string, maxAttemptedBy+1)
1207+
for i := range maxAttemptedBy + 1 {
1208+
attemptedBy[i] = "attempt_" + strconv.Itoa(i)
1209+
}
1210+
1211+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1212+
AttemptedBy: attemptedBy,
1213+
})
1214+
1215+
// Job is in a non-default queue so it's not found.
1216+
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1217+
ClientID: clientID,
1218+
MaxAttemptedBy: maxAttemptedBy,
1219+
MaxToLock: maxToLock,
1220+
Queue: rivercommon.QueueDefault,
1221+
})
1222+
require.NoError(t, err)
1223+
require.Len(t, jobRows, 1)
1224+
1225+
jobRow := jobRows[0]
1226+
require.Equal(t, append(
1227+
attemptedBy[2:], // start at 2 because there were 2 extra elements
1228+
clientID,
1229+
), jobRow.AttemptedBy)
1230+
require.Len(t, jobRow.AttemptedBy, maxAttemptedBy)
1231+
})
11521232
})
11531233

11541234
t.Run("JobGetByID", func(t *testing.T) {

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ WITH locked_jobs AS (
159159
priority ASC,
160160
scheduled_at ASC,
161161
id ASC
162-
LIMIT @max::integer
162+
LIMIT @max_to_lock::integer
163163
FOR UPDATE
164164
SKIP LOCKED
165165
)
@@ -169,7 +169,18 @@ SET
169169
state = 'running',
170170
attempt = river_job.attempt + 1,
171171
attempted_at = coalesce(sqlc.narg('now')::timestamptz, now()),
172-
attempted_by = array_append(river_job.attempted_by, @attempted_by::text)
172+
attempted_by = CASE WHEN array_length(river_job.attempted_by, 1) >= @max_attempted_by::int
173+
THEN array_append(
174+
-- +2 instead of +1 because in one of those history
175+
-- making mistakes that's likely in aggregate cost
176+
-- humanity >$10B in bugs and lost productivity by
177+
-- now, like strings, Postgres array indexing start
178+
-- at 1 instead of 0.
179+
river_job.attempted_by[array_length(river_job.attempted_by, 1) + 2 - @max_attempted_by:],
180+
@attempted_by::text
181+
)
182+
ELSE array_append(river_job.attempted_by, @attempted_by::text)
183+
END
173184
FROM
174185
locked_jobs
175186
WHERE

0 commit comments

Comments
 (0)