Skip to content

Commit f0e174b

Browse files
committed
Add maximum bound for attempted_by array
As described by #972, it may be possible for huge numbers of snoozes to blow out a job row's `attempted_by` array as a job is locked over and over again. Multiplied by many jobs, this can produce vast quantities of data that gets sent over the network. Here, put in a ratchet on `attempted_by` so that if the array becomes larger than 100 elements, we knock the oldest one off in favor of the most recent client and the most fresh 99. Unfortunately the implementation isn't particularly clean in either Postgres or SQLite. In Postgres it would've been cleaner if we'd had the `attempted_by` in reverse order so the new client was on front because the built-in array functions would be friendlier to that layout, but because it's not, we have to do something a little hackier involving a `CASE` statement instead. SQLite is even worse. SQLite has no array functions at all, which doesn't help, but moreover every strategy I tried ended up blocked by a sqlc SQLite bug, so after trying everything I could think of, I ended up having to extract the piece that does the array truncation into a SQL template string to get this over the line. This could be removed in the future if any one of a number of outstanding sqlc bugs are fixed (e.g. [1]). [1] sqlc-dev/sqlc#3610
1 parent 4d07c0d commit f0e174b

File tree

12 files changed

+239
-60
lines changed

12 files changed

+239
-60
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2222
- The reindexer maintenance service now reindexes all `river_job` indexes, including its primary key. This is expected to help in situations where the jobs table has in the past expanded to a very large size (which makes most indexes larger), is now a much more modest size, but has left the indexes in their expanded state. [PR #963](https://github.com/riverqueue/river/pull/963).
2323
- The River CLI now accepts a `--target-version` of 0 with `river migrate-down` to run all down migrations and remove all River tables (previously, -1 was used for this; -1 still works, but now 0 also works). [PR #966](https://github.com/riverqueue/river/pull/966).
2424
- **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970).
25+
- Add maximum bound to each job's `attempted_by` array so that in degenerate cases where a job is locked many, many times (say it's snoozed hundreds of times), it doesn't grow to unlimited bounds. [PR #974](https://github.com/riverqueue/river/pull/974).
2526

2627
### Fixed
2728

producer.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -742,13 +742,18 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
742742
// back to the queue.
743743
ctx := context.WithoutCancel(workCtx)
744744

745+
// Maximum size of the `attempted_by` array on each job row. This maximum is
746+
// rarely hit, but exists to protect against degenerate cases.
747+
const maxAttemptedBy = 100
748+
745749
jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
746-
ClientID: p.config.ClientID,
747-
Max: count,
748-
Now: p.Time.NowUTCOrNil(),
749-
Queue: p.config.Queue,
750-
ProducerID: p.id.Load(),
751-
Schema: p.config.Schema,
750+
ClientID: p.config.ClientID,
751+
Max: count,
752+
MaxAttemptedBy: maxAttemptedBy,
753+
Now: p.Time.NowUTCOrNil(),
754+
Queue: p.config.Queue,
755+
ProducerID: p.id.Load(),
756+
Schema: p.config.Schema,
752757
})
753758
if err != nil {
754759
p.Logger.Error(p.Name+": Error fetching jobs", slog.String("err", err.Error()), slog.String("queue", p.config.Queue))

riverdriver/river_driver_interface.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,13 @@ type JobDeleteBeforeParams struct {
356356
}
357357

358358
type JobGetAvailableParams struct {
359-
ClientID string
360-
Max int
361-
Now *time.Time
362-
ProducerID int64
363-
Queue string
364-
Schema string
359+
ClientID string
360+
Max int
361+
MaxAttemptedBy int
362+
Now *time.Time
363+
ProducerID int64
364+
Queue string
365+
Schema string
365366
}
366367

367368
type JobGetByIDParams struct {

riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Lines changed: 19 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverdatabasesql/river_database_sql_driver.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,11 @@ func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobD
228228

229229
func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
230230
jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{
231-
AttemptedBy: params.ClientID,
232-
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
233-
Now: params.Now,
234-
Queue: params.Queue,
231+
AttemptedBy: params.ClientID,
232+
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
233+
MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)),
234+
Now: params.Now,
235+
Queue: params.Queue,
235236
})
236237
if err != nil {
237238
return nil, interpretError(err)

riverdriver/riverdrivertest/riverdrivertest.go

Lines changed: 101 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
862862
t.Run("JobGetAvailable", func(t *testing.T) {
863863
t.Parallel()
864864

865+
const (
866+
max = 100
867+
maxAttemptedBy = 10
868+
)
869+
865870
t.Run("Success", func(t *testing.T) {
866871
t.Parallel()
867872

@@ -870,9 +875,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
870875
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{})
871876

872877
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
873-
ClientID: clientID,
874-
Max: 100,
875-
Queue: rivercommon.QueueDefault,
878+
ClientID: clientID,
879+
Max: max,
880+
MaxAttemptedBy: maxAttemptedBy,
881+
Queue: rivercommon.QueueDefault,
876882
})
877883
require.NoError(t, err)
878884
require.Len(t, jobRows, 1)
@@ -891,9 +897,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
891897

892898
// Two rows inserted but only one found because of the added limit.
893899
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
894-
ClientID: clientID,
895-
Max: 1,
896-
Queue: rivercommon.QueueDefault,
900+
ClientID: clientID,
901+
Max: 1,
902+
MaxAttemptedBy: maxAttemptedBy,
903+
Queue: rivercommon.QueueDefault,
897904
})
898905
require.NoError(t, err)
899906
require.Len(t, jobRows, 1)
@@ -910,9 +917,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
910917

911918
// Job is in a non-default queue so it's not found.
912919
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
913-
ClientID: clientID,
914-
Max: 100,
915-
Queue: rivercommon.QueueDefault,
920+
ClientID: clientID,
921+
Max: max,
922+
MaxAttemptedBy: maxAttemptedBy,
923+
Queue: rivercommon.QueueDefault,
916924
})
917925
require.NoError(t, err)
918926
require.Empty(t, jobRows)
@@ -931,10 +939,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
931939

932940
// Job is scheduled a while from now so it's not found.
933941
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
934-
ClientID: clientID,
935-
Max: 100,
936-
Now: &now,
937-
Queue: rivercommon.QueueDefault,
942+
ClientID: clientID,
943+
Max: max,
944+
MaxAttemptedBy: maxAttemptedBy,
945+
Now: &now,
946+
Queue: rivercommon.QueueDefault,
938947
})
939948
require.NoError(t, err)
940949
require.Empty(t, jobRows)
@@ -956,10 +965,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
956965
})
957966

958967
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
959-
ClientID: clientID,
960-
Max: 100,
961-
Now: ptrutil.Ptr(now),
962-
Queue: rivercommon.QueueDefault,
968+
ClientID: clientID,
969+
Max: max,
970+
MaxAttemptedBy: maxAttemptedBy,
971+
Now: ptrutil.Ptr(now),
972+
Queue: rivercommon.QueueDefault,
963973
})
964974
require.NoError(t, err)
965975
require.Len(t, jobRows, 1)
@@ -979,9 +989,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
979989
}
980990

981991
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
982-
ClientID: clientID,
983-
Max: 2,
984-
Queue: rivercommon.QueueDefault,
992+
ClientID: clientID,
993+
Max: 2,
994+
MaxAttemptedBy: maxAttemptedBy,
995+
Queue: rivercommon.QueueDefault,
985996
})
986997
require.NoError(t, err)
987998
require.Len(t, jobRows, 2, "expected to fetch exactly 2 jobs")
@@ -998,15 +1009,82 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
9981009

9991010
// Should fetch the one remaining job on the next attempt:
10001011
jobRows, err = exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1001-
ClientID: clientID,
1002-
Max: 1,
1003-
Queue: rivercommon.QueueDefault,
1012+
ClientID: clientID,
1013+
Max: 1,
1014+
MaxAttemptedBy: maxAttemptedBy,
1015+
Queue: rivercommon.QueueDefault,
10041016
})
10051017
require.NoError(t, err)
10061018
require.NoError(t, err)
10071019
require.Len(t, jobRows, 1, "expected to fetch exactly 1 job")
10081020
require.Equal(t, 3, jobRows[0].Priority, "expected final job to have priority 3")
10091021
})
1022+
1023+
t.Run("AttemptedByAtMaxTruncated", func(t *testing.T) {
1024+
t.Parallel()
1025+
1026+
exec, _ := setup(ctx, t)
1027+
1028+
attemptedBy := make([]string, maxAttemptedBy)
1029+
for i := range maxAttemptedBy {
1030+
attemptedBy[i] = "attempt_" + strconv.Itoa(i)
1031+
}
1032+
1033+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1034+
AttemptedBy: attemptedBy,
1035+
})
1036+
1037+
// Job is in a non-default queue so it's not found.
1038+
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1039+
ClientID: clientID,
1040+
Max: max,
1041+
MaxAttemptedBy: maxAttemptedBy,
1042+
Queue: rivercommon.QueueDefault,
1043+
})
1044+
require.NoError(t, err)
1045+
require.Len(t, jobRows, 1)
1046+
1047+
jobRow := jobRows[0]
1048+
require.Equal(t, append(
1049+
attemptedBy[1:],
1050+
clientID,
1051+
), jobRow.AttemptedBy)
1052+
})
1053+
1054+
// Almost identical to the above, but tests that there are more existing
1055+
// `attempted_by` elements than the maximum allowed. There's a fine bug
1056+
// around use of > versus >= in the query's conditional, so make sure to
1057+
// capture both cases to make sure they work.
1058+
t.Run("AttemptedByOverMaxTruncated", func(t *testing.T) {
1059+
t.Parallel()
1060+
1061+
exec, _ := setup(ctx, t)
1062+
1063+
attemptedBy := make([]string, maxAttemptedBy+1)
1064+
for i := range maxAttemptedBy + 1 {
1065+
attemptedBy[i] = "attempt_" + strconv.Itoa(i)
1066+
}
1067+
1068+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1069+
AttemptedBy: attemptedBy,
1070+
})
1071+
1072+
// Job is in a non-default queue so it's not found.
1073+
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1074+
ClientID: clientID,
1075+
Max: max,
1076+
MaxAttemptedBy: maxAttemptedBy,
1077+
Queue: rivercommon.QueueDefault,
1078+
})
1079+
require.NoError(t, err)
1080+
require.Len(t, jobRows, 1)
1081+
1082+
jobRow := jobRows[0]
1083+
require.Equal(t, append(
1084+
attemptedBy[2:], // start at 2 because there were 2 extra elements
1085+
clientID,
1086+
), jobRow.AttemptedBy)
1087+
})
10101088
})
10111089

10121090
t.Run("JobGetByID", func(t *testing.T) {

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,19 @@ SET
146146
state = 'running',
147147
attempt = river_job.attempt + 1,
148148
attempted_at = coalesce(sqlc.narg('now')::timestamptz, now()),
149-
attempted_by = array_append(river_job.attempted_by, @attempted_by::text)
149+
attempted_by = CASE WHEN array_length(river_job.attempted_by, 1) >= @max_attempted_by::int
150+
THEN array_append(
151+
-- +2 instead of +1 because in one of those history
152+
-- making mistakes that's likely in aggregate cost
153+
-- humanity >$10B in bugs and lost productivity by
154+
-- now, like strings, Postgres array indexing start
155+
-- at 1 instead of 0.
156+
river_job.attempted_by[array_length(river_job.attempted_by,
157+
1) + 2 - @max_attempted_by:],
158+
@attempted_by::text
159+
)
160+
ELSE array_append(river_job.attempted_by, @attempted_by::text)
161+
END
150162
FROM
151163
locked_jobs
152164
WHERE

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Lines changed: 19 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)