Skip to content

Commit 4d07c0d

Browse files
authored
Have HookWorkEnd.WorkEnd receive a JobRow in addition to an error (#970)
Here, modify the `HookWorkEnd.WorkEnd` so that it receives a `JobRow` of the job that just finished working in addition to an error. It's not great having to change this, but having a `JobRow` to work with will be crucial for most functionality that people might like to use this hook to implement. It's a breaking change, but hopefully not a bad one because I think not having `JobRow` previously is a sign that not many people had yet tried to implement anything with `HookWorkEnd` yet, because they would've needed a `JobRow` for most of what they wanted to do.
1 parent 4976f72 commit 4d07c0d

File tree

7 files changed

+36
-17
lines changed

7 files changed

+36
-17
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
⚠️ Version 0.24.0 has a breaking change in `HookWorkEnd.WorkEnd` in that a new `JobRow` parameter has been added to the function's signature. Any intergration defining a custom `HookWorkEnd` hook should update its implementation so the hook continues to be called correctly.
11+
1012
⚠️ Internal APIs used for communication between River and River Pro have changed. If using River Pro, make sure to update River and River Pro to latest at the same time to get compatible versions. River v0.24.0 is compatible with River Pro v0.16.0.
1113

1214
### Added
@@ -19,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1921
- Bring all driver tests into separate package so they don't leak dependencies. This removes dependencies from the top level `river` package that most River installations won't need, thereby reducing the transitive dependency load of most River installations. [PR #955](https://github.com/riverqueue/river/pull/955).
2022
- The reindexer maintenance service now reindexes all `river_job` indexes, including its primary key. This is expected to help in situations where the jobs table has in the past expanded to a very large size (which makes most indexes larger), is now a much more modest size, but has left the indexes in their expanded state. [PR #963](https://github.com/riverqueue/river/pull/963).
2123
- The River CLI now accepts a `--target-version` of 0 with `river migrate-down` to run all down migrations and remove all River tables (previously, -1 was used for this; -1 still works, but now 0 also works). [PR #966](https://github.com/riverqueue/river/pull/966).
24+
- **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970).
2225

2326
### Fixed
2427

client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ func Test_Client(t *testing.T) {
813813
workEndHookCalled := false
814814

815815
bundle.config.Hooks = []rivertype.Hook{
816-
HookWorkEndFunc(func(ctx context.Context, err error) error {
816+
HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error {
817817
workEndHookCalled = true
818818
return err
819819
}),
@@ -1433,7 +1433,7 @@ func (metadataHookWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRo
14331433

14341434
type metadataHookWorkEnd struct{ rivertype.Hook }
14351435

1436-
func (metadataHookWorkEnd) WorkEnd(ctx context.Context, err error) error {
1436+
func (metadataHookWorkEnd) WorkEnd(ctx context.Context, job *rivertype.JobRow, err error) error {
14371437
metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
14381438
if !hasMetadataUpdates {
14391439
panic("expected to be called from within job executor")

hook_defaults_funcs.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ func (f HookWorkBeginFunc) IsHook() bool { return true }
3535

3636
// HookWorkEndFunc is a convenience helper for implementing
3737
// rivertype.HookworkEnd using a simple function instead of a struct.
38-
type HookWorkEndFunc func(ctx context.Context, err error) error
38+
type HookWorkEndFunc func(ctx context.Context, job *rivertype.JobRow, err error) error
3939

40-
func (f HookWorkEndFunc) WorkEnd(ctx context.Context, err error) error {
41-
return f(ctx, err)
40+
func (f HookWorkEndFunc) WorkEnd(ctx context.Context, job *rivertype.JobRow, err error) error {
41+
return f(ctx, job, err)
4242
}
4343

4444
func (f HookWorkEndFunc) IsHook() bool { return true }

internal/hooklookup/hook_lookup_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,6 @@ var _ rivertype.HookWorkEnd = &testHookWorkEnd{}
267267

268268
type testHookWorkEnd struct{ rivertype.Hook }
269269

270-
func (t *testHookWorkEnd) WorkEnd(ctx context.Context, err error) error {
270+
func (t *testHookWorkEnd) WorkEnd(ctx context.Context, job *rivertype.JobRow, err error) error {
271271
return nil
272272
}

internal/jobexecutor/job_executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
215215
e.HookLookupGlobal.ByHookKind(hooklookup.HookKindWorkEnd),
216216
e.WorkUnit.HookLookup(e.HookLookupByJob).ByHookKind(hooklookup.HookKindWorkEnd)...,
217217
) {
218-
err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err) //nolint:forcetypeassert
218+
err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, e.JobRow, err) //nolint:forcetypeassert
219219
}
220220
}
221221

internal/jobexecutor/job_executor_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ func TestJobExecutor_Execute(t *testing.T) {
819819
workBeginCalled = true
820820
return nil
821821
}),
822-
HookWorkEndFunc(func(ctx context.Context, err error) error {
822+
HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error {
823823
workEndCalled = true
824824
return err
825825
}),
@@ -845,12 +845,12 @@ func TestJobExecutor_Execute(t *testing.T) {
845845
workEnd2Called bool
846846
)
847847
executor.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{
848-
HookWorkEndFunc(func(ctx context.Context, err error) error {
848+
HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error {
849849
workEnd1Called = true
850850
require.EqualError(t, err, "job error")
851851
return err
852852
}),
853-
HookWorkEndFunc(func(ctx context.Context, err error) error {
853+
HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error {
854854
workEnd2Called = true
855855
require.EqualError(t, err, "job error")
856856
return err
@@ -879,12 +879,12 @@ func TestJobExecutor_Execute(t *testing.T) {
879879
workEnd2Called bool
880880
)
881881
executor.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{
882-
HookWorkEndFunc(func(ctx context.Context, err error) error {
882+
HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error {
883883
workEnd1Called = true
884884
require.EqualError(t, err, "job error")
885885
return err
886886
}),
887-
HookWorkEndFunc(func(ctx context.Context, err error) error {
887+
HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error {
888888
workEnd2Called = true
889889
require.EqualError(t, err, "job error")
890890
return nil // second hook suppresses the error
@@ -917,10 +917,10 @@ func (f HookWorkBeginFunc) WorkBegin(ctx context.Context, job *rivertype.JobRow)
917917

918918
func (f HookWorkBeginFunc) IsHook() bool { return true }
919919

920-
type HookWorkEndFunc func(ctx context.Context, err error) error
920+
type HookWorkEndFunc func(ctx context.Context, job *rivertype.JobRow, err error) error
921921

922-
func (f HookWorkEndFunc) WorkEnd(ctx context.Context, err error) error {
923-
return f(ctx, err)
922+
func (f HookWorkEndFunc) WorkEnd(ctx context.Context, job *rivertype.JobRow, err error) error {
923+
return f(ctx, job, err)
924924
}
925925

926926
func (f HookWorkEndFunc) IsHook() bool { return true }

rivertype/river_type.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ type JobInsertParams struct {
300300
// List of hook interfaces that may be implemented:
301301
// - HookInsertBegin
302302
// - HookWorkBegin
303+
// - HookWorkEnd
303304
//
304305
// More operation-specific interfaces may be added in future versions.
305306
type Hook interface {
@@ -314,6 +315,7 @@ type Hook interface {
314315
type HookInsertBegin interface {
315316
Hook
316317

318+
// InsertBegin is invoked just before a job is inserted to the database.
317319
InsertBegin(ctx context.Context, params *JobInsertParams) error
318320
}
319321

@@ -322,6 +324,16 @@ type HookInsertBegin interface {
322324
type HookWorkBegin interface {
323325
Hook
324326

327+
// WorkBegin is invoked after a job has been locked and assigned to a
328+
// particular executor for work and just before the job is actually worked.
329+
//
330+
// Returning an error from any HookWorkBegin hook will abort the job early
331+
// such that it has an error set and doesn't work, with a retry scheduled
332+
// according to its retry policy.
333+
//
334+
// This function doesn't return a context so any context set in WorkBegin is
335+
// discarded after the function returns. If persistent context needs to be
336+
// set, middleware should be used instead.
325337
WorkBegin(ctx context.Context, job *JobRow) error
326338
}
327339

@@ -338,18 +350,22 @@ type HookWorkEnd interface {
338350
//
339351
// err := e.WorkUnit.Work(ctx)
340352
// for _, hook := range hooks {
341-
// err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err)
353+
// err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, e.JobRow, err)
342354
// }
343355
// return err
344356
//
345357
// If a hook does not want to modify an error result, it should make sure to
346358
// return whatever error value it received as its argument whether that
347359
// error is nil or not.
348360
//
361+
// The JobRow received by WorkEnd is the same one passed to HookWorkBegin's
362+
// WorkBegin. Its state, errors, next scheduled at time, etc. have not yet
363+
// been updated based on the latest work result.
364+
//
349365
// Will not receive a common context related to HookWorkBegin because
350366
// WorkBegin doesn't return a context. Middleware should be used for this
351367
// sort of shared context instead.
352-
WorkEnd(ctx context.Context, err error) error
368+
WorkEnd(ctx context.Context, job *JobRow, err error) error
353369
}
354370

355371
// Middleware is an arbitrary interface for a struct which will execute some

0 commit comments

Comments
 (0)