Skip to content

Commit 24a8c2f

Browse files
authored
Implement JobDeleteMany operations (#962)
Here, add functions for `Client.JobDeleteMany` which lets jobs be deleted in batches. The main impetus for this is to give us a way of implementing a "purge queue" function, which has been previously requested by users, and which is generally just a good feature. We copy the `JobList` API very closely and use almost all the same implementation, with the caveat that I've removed some of the more complex features that I don't think will be as necessary for deletion, with the sorting features being the biggest one, but also arbitrary SQL in `WHERE` queries. These can always be added in later because they're purely additive to the API, but I'm hoping that we won't need them.
1 parent 75ab666 commit 24a8c2f

File tree

20 files changed

+1201
-72
lines changed

20 files changed

+1201
-72
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
### Added
1515

1616
- The project now tests against [libSQL](https://github.com/tursodatabase/libsql), a popular SQLite fork. It's used through the same `riversqlite` driver that SQLite uses. [PR #957](https://github.com/riverqueue/river/pull/957)
17+
- Added `JobDeleteMany` operations that remove many jobs in a single operation according to input criteria. [PR #962](https://github.com/riverqueue/river/pull/962)
1718

1819
### Changed
1920

client.go

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2052,6 +2052,71 @@ func validateQueueName(queueName string) error {
20522052
return nil
20532053
}
20542054

2055+
// JobDeleteManyResult is the result of a job list operation. It contains a list of
2056+
// jobs and a cursor for fetching the next page of results.
2057+
type JobDeleteManyResult struct {
2058+
// Jobs is a slice of job returned as part of the list operation.
2059+
Jobs []*rivertype.JobRow
2060+
}
2061+
2062+
// JobDeleteMany deletes many jobs at once based on the conditions defined by
2063+
// JobDeleteManyParams. Running jobs are always ignored.
2064+
//
2065+
// params := river.NewJobDeleteManyParams().First(10).State(rivertype.JobStateCompleted)
2066+
// jobRows, err := client.JobDeleteMany(ctx, params)
2067+
// if err != nil {
2068+
// // handle error
2069+
// }
2070+
func (c *Client[TTx]) JobDeleteMany(ctx context.Context, params *JobDeleteManyParams) (*JobDeleteManyResult, error) {
2071+
if !c.driver.PoolIsSet() {
2072+
return nil, errNoDriverDBPool
2073+
}
2074+
2075+
if params == nil {
2076+
params = NewJobDeleteManyParams()
2077+
}
2078+
params.schema = c.config.Schema
2079+
2080+
listParams, err := dblist.JobMakeDriverParams(ctx, params.toDBParams(), c.driver.SQLFragmentColumnIn)
2081+
if err != nil {
2082+
return nil, err
2083+
}
2084+
2085+
jobs, err := c.driver.GetExecutor().JobDeleteMany(ctx, (*riverdriver.JobDeleteManyParams)(listParams))
2086+
if err != nil {
2087+
return nil, err
2088+
}
2089+
2090+
return &JobDeleteManyResult{Jobs: jobs}, nil
2091+
}
2092+
2093+
// JobDeleteManyTx deletes many jobs at once based on the conditions defined by
2094+
// JobDeleteManyParams. Running jobs are always ignored.
2095+
//
2096+
// params := river.NewJobDeleteManyParams().First(10).States(river.JobStateCompleted)
2097+
// jobRows, err := client.JobDeleteManyTx(ctx, tx, params)
2098+
// if err != nil {
2099+
// // handle error
2100+
// }
2101+
func (c *Client[TTx]) JobDeleteManyTx(ctx context.Context, tx TTx, params *JobDeleteManyParams) (*JobDeleteManyResult, error) {
2102+
if params == nil {
2103+
params = NewJobDeleteManyParams()
2104+
}
2105+
params.schema = c.config.Schema
2106+
2107+
listParams, err := dblist.JobMakeDriverParams(ctx, params.toDBParams(), c.driver.SQLFragmentColumnIn)
2108+
if err != nil {
2109+
return nil, err
2110+
}
2111+
2112+
jobs, err := c.driver.UnwrapExecutor(tx).JobDeleteMany(ctx, (*riverdriver.JobDeleteManyParams)(listParams))
2113+
if err != nil {
2114+
return nil, err
2115+
}
2116+
2117+
return &JobDeleteManyResult{Jobs: jobs}, nil
2118+
}
2119+
20552120
// JobListResult is the result of a job list operation. It contains a list of
20562121
// jobs and a cursor for fetching the next page of results.
20572122
type JobListResult struct {
@@ -2094,10 +2159,16 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
20942159
return nil, err
20952160
}
20962161

2097-
jobs, err := dblist.JobList(ctx, c.driver.GetExecutor(), dbParams, c.driver.SQLFragmentColumnIn)
2162+
listParams, err := dblist.JobMakeDriverParams(ctx, dbParams, c.driver.SQLFragmentColumnIn)
2163+
if err != nil {
2164+
return nil, err
2165+
}
2166+
2167+
jobs, err := c.driver.GetExecutor().JobList(ctx, listParams)
20982168
if err != nil {
20992169
return nil, err
21002170
}
2171+
21012172
res := &JobListResult{Jobs: jobs}
21022173
if len(jobs) > 0 {
21032174
res.LastCursor = jobListCursorFromJobAndParams(jobs[len(jobs)-1], params)
@@ -2129,10 +2200,16 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
21292200
return nil, err
21302201
}
21312202

2132-
jobs, err := dblist.JobList(ctx, c.driver.UnwrapExecutor(tx), dbParams, c.driver.SQLFragmentColumnIn)
2203+
listParams, err := dblist.JobMakeDriverParams(ctx, dbParams, c.driver.SQLFragmentColumnIn)
2204+
if err != nil {
2205+
return nil, err
2206+
}
2207+
2208+
jobs, err := c.driver.UnwrapExecutor(tx).JobList(ctx, listParams)
21332209
if err != nil {
21342210
return nil, err
21352211
}
2212+
21362213
res := &JobListResult{Jobs: jobs}
21372214
if len(jobs) > 0 {
21382215
res.LastCursor = jobListCursorFromJobAndParams(jobs[len(jobs)-1], params)

0 commit comments

Comments
 (0)