Skip to content

Commit 0704131

Browse files
committed
Add maximum bound for attempted_by array
As described by #972, it may be possible for huge numbers of snoozes to blow out a job row's `attempted_by` array as a job is locked over and over again. Multiplied by many jobs, this can produce vast quantities of data that gets sent over the network. Here, put in a ratchet on `attempted_by` so that if the array becomes larger than 100 elements, we knock the oldest one off in favor of the most recent client and the most fresh 99. Unfortunately the implementation isn't particularly clean in either Postgres or SQLite. In Postgres it would've been cleaner if we'd had the `attempted_by` in reverse order so the new client was on front because the built-in array functions would be friendlier to that layout, but because it's not, we have to do something a little hackier involving a `CASE` statement instead. SQLite is even worse. SQLite has no array functions at all, which doesn't help, but moreover every strategy I tried ended up blocked by a sqlc SQLite bug, so after trying everything I could think of, I ended up having to extract the piece that does the array truncation into a SQL template string to get this over the line. This could be removed in the future if any one of a number of outstanding sqlc bugs are fixed (e.g. [1]). [1] sqlc-dev/sqlc#3610
1 parent 4d07c0d commit 0704131

File tree

11 files changed

+356
-60
lines changed

11 files changed

+356
-60
lines changed

producer.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -742,13 +742,18 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
742742
// back to the queue.
743743
ctx := context.WithoutCancel(workCtx)
744744

745+
// Maximum size of the `attempted_by` array on each job row. This maximum is
746+
// rarely hit, but exists to protect against degenerate cases.
747+
const maxAttemptedBy = 100
748+
745749
jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
746-
ClientID: p.config.ClientID,
747-
Max: count,
748-
Now: p.Time.NowUTCOrNil(),
749-
Queue: p.config.Queue,
750-
ProducerID: p.id.Load(),
751-
Schema: p.config.Schema,
750+
ClientID: p.config.ClientID,
751+
Max: count,
752+
MaxAttemptedBy: maxAttemptedBy,
753+
Now: p.Time.NowUTCOrNil(),
754+
Queue: p.config.Queue,
755+
ProducerID: p.id.Load(),
756+
Schema: p.config.Schema,
752757
})
753758
if err != nil {
754759
p.Logger.Error(p.Name+": Error fetching jobs", slog.String("err", err.Error()), slog.String("queue", p.config.Queue))

riverdriver/river_driver_interface.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,13 @@ type JobDeleteBeforeParams struct {
356356
}
357357

358358
type JobGetAvailableParams struct {
359-
ClientID string
360-
Max int
361-
Now *time.Time
362-
ProducerID int64
363-
Queue string
364-
Schema string
359+
ClientID string
360+
Max int
361+
MaxAttemptedBy int
362+
Now *time.Time
363+
ProducerID int64
364+
Queue string
365+
Schema string
365366
}
366367

367368
type JobGetByIDParams struct {

riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Lines changed: 12 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverdatabasesql/river_database_sql_driver.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,11 @@ func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobD
228228

229229
func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
230230
jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{
231-
AttemptedBy: params.ClientID,
232-
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
233-
Now: params.Now,
234-
Queue: params.Queue,
231+
AttemptedBy: params.ClientID,
232+
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
233+
MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)),
234+
Now: params.Now,
235+
Queue: params.Queue,
235236
})
236237
if err != nil {
237238
return nil, interpretError(err)

riverdriver/riverdrivertest/riverdrivertest.go

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
862862
t.Run("JobGetAvailable", func(t *testing.T) {
863863
t.Parallel()
864864

865+
const maxAttemptedBy = 10
866+
865867
t.Run("Success", func(t *testing.T) {
866868
t.Parallel()
867869

@@ -870,9 +872,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
870872
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{})
871873

872874
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
873-
ClientID: clientID,
874-
Max: 100,
875-
Queue: rivercommon.QueueDefault,
875+
ClientID: clientID,
876+
Max: 100,
877+
MaxAttemptedBy: maxAttemptedBy,
878+
Queue: rivercommon.QueueDefault,
876879
})
877880
require.NoError(t, err)
878881
require.Len(t, jobRows, 1)
@@ -891,9 +894,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
891894

892895
// Two rows inserted but only one found because of the added limit.
893896
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
894-
ClientID: clientID,
895-
Max: 1,
896-
Queue: rivercommon.QueueDefault,
897+
ClientID: clientID,
898+
Max: 1,
899+
MaxAttemptedBy: maxAttemptedBy,
900+
Queue: rivercommon.QueueDefault,
897901
})
898902
require.NoError(t, err)
899903
require.Len(t, jobRows, 1)
@@ -910,9 +914,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
910914

911915
// Job is in a non-default queue so it's not found.
912916
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
913-
ClientID: clientID,
914-
Max: 100,
915-
Queue: rivercommon.QueueDefault,
917+
ClientID: clientID,
918+
Max: 100,
919+
MaxAttemptedBy: maxAttemptedBy,
920+
Queue: rivercommon.QueueDefault,
916921
})
917922
require.NoError(t, err)
918923
require.Empty(t, jobRows)
@@ -931,10 +936,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
931936

932937
// Job is scheduled a while from now so it's not found.
933938
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
934-
ClientID: clientID,
935-
Max: 100,
936-
Now: &now,
937-
Queue: rivercommon.QueueDefault,
939+
ClientID: clientID,
940+
Max: 100,
941+
MaxAttemptedBy: maxAttemptedBy,
942+
Now: &now,
943+
Queue: rivercommon.QueueDefault,
938944
})
939945
require.NoError(t, err)
940946
require.Empty(t, jobRows)
@@ -956,10 +962,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
956962
})
957963

958964
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
959-
ClientID: clientID,
960-
Max: 100,
961-
Now: ptrutil.Ptr(now),
962-
Queue: rivercommon.QueueDefault,
965+
ClientID: clientID,
966+
Max: 100,
967+
MaxAttemptedBy: maxAttemptedBy,
968+
Now: ptrutil.Ptr(now),
969+
Queue: rivercommon.QueueDefault,
963970
})
964971
require.NoError(t, err)
965972
require.Len(t, jobRows, 1)
@@ -979,9 +986,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
979986
}
980987

981988
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
982-
ClientID: clientID,
983-
Max: 2,
984-
Queue: rivercommon.QueueDefault,
989+
ClientID: clientID,
990+
Max: 2,
991+
MaxAttemptedBy: maxAttemptedBy,
992+
Queue: rivercommon.QueueDefault,
985993
})
986994
require.NoError(t, err)
987995
require.Len(t, jobRows, 2, "expected to fetch exactly 2 jobs")
@@ -998,15 +1006,47 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
9981006

9991007
// Should fetch the one remaining job on the next attempt:
10001008
jobRows, err = exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1001-
ClientID: clientID,
1002-
Max: 1,
1003-
Queue: rivercommon.QueueDefault,
1009+
ClientID: clientID,
1010+
Max: 1,
1011+
MaxAttemptedBy: maxAttemptedBy,
1012+
Queue: rivercommon.QueueDefault,
10041013
})
10051014
require.NoError(t, err)
10061015
require.NoError(t, err)
10071016
require.Len(t, jobRows, 1, "expected to fetch exactly 1 job")
10081017
require.Equal(t, 3, jobRows[0].Priority, "expected final job to have priority 3")
10091018
})
1019+
1020+
t.Run("AttemptedByTruncatedAtMax", func(t *testing.T) {
1021+
t.Parallel()
1022+
1023+
exec, _ := setup(ctx, t)
1024+
1025+
attemptedBy := make([]string, maxAttemptedBy)
1026+
for i := range maxAttemptedBy {
1027+
attemptedBy[i] = "attempt_" + strconv.Itoa(i)
1028+
}
1029+
1030+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1031+
AttemptedBy: attemptedBy,
1032+
})
1033+
1034+
// Job is in a non-default queue so it's not found.
1035+
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
1036+
ClientID: clientID,
1037+
Max: 100,
1038+
MaxAttemptedBy: maxAttemptedBy,
1039+
Queue: rivercommon.QueueDefault,
1040+
})
1041+
require.NoError(t, err)
1042+
require.Len(t, jobRows, 1)
1043+
1044+
jobRow := jobRows[0]
1045+
require.Equal(t, append(
1046+
attemptedBy[1:],
1047+
clientID,
1048+
), jobRow.AttemptedBy)
1049+
})
10101050
})
10111051

10121052
t.Run("JobGetByID", func(t *testing.T) {

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ SET
146146
state = 'running',
147147
attempt = river_job.attempt + 1,
148148
attempted_at = coalesce(sqlc.narg('now')::timestamptz, now()),
149-
attempted_by = array_append(river_job.attempted_by, @attempted_by::text)
149+
attempted_by = CASE WHEN array_length(river_job.attempted_by, 1) > @max_attempted_by::int
150+
THEN array_append(river_job.attempted_by[array_length(river_job.attempted_by, 1) - @max_attempted_by:], @attempted_by::text)
151+
ELSE array_append(river_job.attempted_by, @attempted_by::text)
152+
END
150153
FROM
151154
locked_jobs
152155
WHERE

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Lines changed: 12 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverpgxv5/river_pgx_v5_driver.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,11 @@ func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobD
232232

233233
func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
234234
jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{
235-
AttemptedBy: params.ClientID,
236-
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
237-
Now: params.Now,
238-
Queue: params.Queue,
235+
AttemptedBy: params.ClientID,
236+
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
237+
MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)),
238+
Now: params.Now,
239+
Queue: params.Queue,
239240
})
240241
if err != nil {
241242
return nil, interpretError(err)

0 commit comments

Comments
 (0)