Skip to content

Commit 4976f72

Browse files
authored
Pass time generator to more database operations (#968)
Here, go through our various database operations in the driver, find `Now` parameters that weren't being used, and start passing them from callers. This'll have no functional effect on deployed environments, but means that more times are stubbed as expected when they're invoked from tests.
1 parent 8292c74 commit 4976f72

File tree

1 file changed

+7
-0
lines changed

1 file changed

+7
-0
lines changed

client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,6 +1356,7 @@ func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor,
13561356
ID: jobID,
13571357
CancelAttemptedAt: c.baseService.Time.NowUTC(),
13581358
ControlTopic: string(notifier.NotificationTopicControl),
1359+
Now: c.baseService.Time.NowUTCOrNil(),
13591360
Schema: c.config.Schema,
13601361
})
13611362
}
@@ -1413,6 +1414,7 @@ func (c *Client[TTx]) JobGetTx(ctx context.Context, tx TTx, id int64) (*rivertyp
14131414
func (c *Client[TTx]) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, error) {
14141415
return c.driver.GetExecutor().JobRetry(ctx, &riverdriver.JobRetryParams{
14151416
ID: id,
1417+
Now: c.baseService.Time.NowUTCOrNil(),
14161418
Schema: c.config.Schema,
14171419
})
14181420
}
@@ -1433,6 +1435,7 @@ func (c *Client[TTx]) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow
14331435
func (c *Client[TTx]) JobRetryTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) {
14341436
return c.driver.UnwrapExecutor(tx).JobRetry(ctx, &riverdriver.JobRetryParams{
14351437
ID: id,
1438+
Now: c.baseService.Time.NowUTCOrNil(),
14361439
Schema: c.config.Schema,
14371440
})
14381441
}
@@ -2269,6 +2272,7 @@ func (c *Client[TTx]) QueuePause(ctx context.Context, name string, opts *QueuePa
22692272

22702273
if err := tx.QueuePause(ctx, &riverdriver.QueuePauseParams{
22712274
Name: name,
2275+
Now: c.baseService.Time.NowUTCOrNil(),
22722276
Schema: c.config.Schema,
22732277
}); err != nil {
22742278
return err
@@ -2304,6 +2308,7 @@ func (c *Client[TTx]) QueuePauseTx(ctx context.Context, tx TTx, name string, opt
23042308

23052309
if err := executorTx.QueuePause(ctx, &riverdriver.QueuePauseParams{
23062310
Name: name,
2311+
Now: c.baseService.Time.NowUTCOrNil(),
23072312
Schema: c.config.Schema,
23082313
}); err != nil {
23092314
return err
@@ -2337,6 +2342,7 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP
23372342

23382343
if err := tx.QueueResume(ctx, &riverdriver.QueueResumeParams{
23392344
Name: name,
2345+
Now: c.baseService.Time.NowUTCOrNil(),
23402346
Schema: c.config.Schema,
23412347
}); err != nil {
23422348
return err
@@ -2373,6 +2379,7 @@ func (c *Client[TTx]) QueueResumeTx(ctx context.Context, tx TTx, name string, op
23732379

23742380
if err := executorTx.QueueResume(ctx, &riverdriver.QueueResumeParams{
23752381
Name: name,
2382+
Now: c.baseService.Time.NowUTCOrNil(),
23762383
Schema: c.config.Schema,
23772384
}); err != nil {
23782385
return err

0 commit comments

Comments
 (0)