-
Hi, how would you make a worker that always runs in the background exactly once and always retries with minimum delay? |
Beta Was this translation helpful? Give feedback.
Replies: 8 comments 3 replies
-
Hi, we'll need more information on this one. If you just want a job worked once, just insert it once and it'll work one time before being set as completed and ignored thereafter. If you mean "exactly once" in the distributed systems sense, then it's not possible. All semantics in River are at-least-once [1] because guaranteeing any better than is ~impossible without other serious tradeoffs. See docs here [2] for writing your own custom retry policy. Writing one that retries immediately would look something like: func (policy *UltraAggressivePolicy) NextAt(job *river.rivertype.JobRow) time.Time {
return time.Now()
} [1] https://riverqueue.com/docs/reliable-workers |
Beta Was this translation helpful? Give feedback.
-
Hi again, Here is my attempt at this using unique jobs, it contains some "helpful" comments, which is why I'm not happy with this solution yet. type DispatchTestAnswersToJobsJobArgs struct {
UniqueID int32 `json:"unique_id"` // FIXME: possibly a bug where you can't have an empty args struct for unique jobs
}
func (DispatchTestAnswersToJobsJobArgs) Kind() string {
return "dispatch_test_answers_to_jobs"
}
func (DispatchTestAnswersToJobsJobArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{
// FIXME: MaxAttempts is actually int16 in the database and this is a very bad bug
// FIXME: jobsnooze overflows the int16, try putting in 32767
MaxAttempts: 1_000_000_000, // NOTE: as many as possible ... this is actually negative in this case because of overflow, this is very confusing
Priority: 1,
UniqueOpts: river.UniqueOpts{
ByPeriod: 99 * 365 * 24 * time.Hour, // for 99 years, yes.
},
}
}
type DispatchTestAnswersToJobsJobWorker struct {
river.WorkerDefaults[DispatchTestAnswersToJobsJobArgs]
}
func (w *DispatchTestAnswersToJobsJobWorker) Work(ctx context.Context, job *river.Job[DispatchTestAnswersToJobsJobArgs]) error {
slog.Debug("dispatching test answers to jobs")
return river.JobSnooze(100 * time.Millisecond)
}
func (w DispatchTestAnswersToJobsJobWorker) Timeout(*river.Job[DispatchTestAnswersToJobsJobArgs]) time.Duration {
return 100 * time.Millisecond
} Multiple test takers take tests and fill in answers, which then get batched by this job to be passed to another job for verification of the answers. The reason is there is a rate limit on the calls to the verification API, so I need throughput control and batching. It needs to avoid redundant calls and desync issues, so there needs to be a single central worker on the job in the snippet.
The obvious thing that prevents me from shipping this code is the jobsnooze overflows the MaxAttempts counter. |
Beta Was this translation helpful? Give feedback.
-
@brandur this is a good improvement for me, but I would like this to be scoped to the worker instead of the client, else I'd have to start managing multiple queues and client types with different policies, which would needlessly complicate my software design. Are there any drawbacks of using this super aggressive scheduling for all workers? |
Beta Was this translation helpful? Give feedback.
-
What you could do is combine periodic jobs with unique opts. Configure a periodic job that inserts every five minutes or so, and set it to Then, configure unique options by state [2], where []rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStateRunning,
rivertype.JobStateRetryable,
rivertype.JobStateScheduled,
} The client will start up, insert an initial job, and that job will start work. The periodic job enqueuer will try to insert new versions of it every five minutes, but since the job is already running, those inserts will be no ops because of the unique opts. I assume the [1] https://riverqueue.com/docs/periodic-jobs#basic-usage |
Beta Was this translation helpful? Give feedback.
-
@brandur that's quite clever. The downside is on the occasional failure of the job, I might have to wait for the job to spawn for up to 5 minutes.
|
Beta Was this translation helpful? Give feedback.
-
Personally I wouldn't go crazy with that, but it won't be that big of a deal resource-wise.
Don't bother changing
Don't do this if only for purposes of not overcomplicating things. Use a for loop. |
Beta Was this translation helpful? Give feedback.
-
I'm still curious as to why you can't snooze for a 100ms, but besides that, your responses were massively helpful indeed. |
Beta Was this translation helpful? Give feedback.
-
Why is it not allowed to have an empty jobargs struct? |
Beta Was this translation helpful? Give feedback.
What you could do is combine periodic jobs with unique opts.
Configure a periodic job that inserts every five minutes or so, and set it to
RunOnStart
so that it always get an insert when a River client starts up.Then, configure unique options by state [2], where
JobStateCompleted
is omitted so that if the job was ever finished, a new one gets inserted:The client will start up, insert an initial job, and that job will start work. The periodic job enqueuer will try to insert new versions of it every five minutes, but since the job is alr…