Skip to content

Commit ad5096b

Browse files
committed
Move job queue cleanup to separate function
1 parent c9c6212 commit ad5096b

File tree

2 files changed

+39
-30
lines changed

2 files changed

+39
-30
lines changed

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,20 +73,11 @@ public final class PostgresJobQueue: JobQueueDriver {
7373

7474
/// Queue configuration
7575
public struct Configuration: Sendable {
76-
let pendingJobsInitialization: JobInitialization
77-
let failedJobsInitialization: JobInitialization
78-
let processingJobsInitialization: JobInitialization
7976
let pollTime: Duration
8077

8178
public init(
82-
pendingJobsInitialization: JobInitialization = .doNothing,
83-
failedJobsInitialization: JobInitialization = .rerun,
84-
processingJobsInitialization: JobInitialization = .rerun,
8579
pollTime: Duration = .milliseconds(100)
8680
) {
87-
self.pendingJobsInitialization = pendingJobsInitialization
88-
self.failedJobsInitialization = failedJobsInitialization
89-
self.processingJobsInitialization = processingJobsInitialization
9081
self.pollTime = pollTime
9182
}
9283
}
@@ -115,20 +106,29 @@ public final class PostgresJobQueue: JobQueueDriver {
115106
await migrations.add(UpdateJobDelay())
116107
}
117108

118-
/// Run on initialization of the job queue
119-
public func onInit() async throws {
109+
/// Cleanup job queues
110+
/// - Parameters:
111+
/// - failedJobs: What to do with jobs tagged as failed
112+
/// - processingJobs: What to do with jobs tagged as processing
113+
/// - pendingJobs: What to do with jobs tagged as pending
114+
/// - Throws:
115+
public func cleanup(
116+
failedJobs: JobInitialization = .doNothing,
117+
processingJobs: JobInitialization = .doNothing,
118+
pendingJobs: JobInitialization = .doNothing
119+
) async throws {
120120
do {
121121
self.logger.info("Waiting for JobQueue migrations")
122122
try await self.migrations.waitUntilCompleted()
123123
_ = try await self.client.withConnection { connection in
124-
self.logger.info("Update Jobs at initialization")
125-
try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection)
124+
self.logger.info("Update Jobs")
125+
try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection)
126126
try await self.updateJobsOnInit(
127127
withStatus: .processing,
128-
onInit: self.configuration.processingJobsInitialization,
128+
onInit: processingJobs,
129129
connection: connection
130130
)
131-
try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection)
131+
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
132132
}
133133
} catch let error as PSQLError {
134134
logger.error(

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ final class JobsTests: XCTestCase {
5151

5252
func createJobQueue(
5353
numWorkers: Int,
54-
configuration: PostgresJobQueue.Configuration,
54+
configuration: PostgresJobQueue.Configuration = .init(),
5555
function: String = #function
5656
) async throws -> JobQueue<PostgresJobQueue> {
5757
let logger = {
@@ -87,6 +87,9 @@ final class JobsTests: XCTestCase {
8787
/// shutdown correctly
8888
@discardableResult public func testJobQueue<T>(
8989
jobQueue: JobQueue<PostgresJobQueue>,
90+
failedJobsInitialization: PostgresJobQueue.JobInitialization = .remove,
91+
processingJobsInitialization: PostgresJobQueue.JobInitialization = .remove,
92+
pendingJobsInitialization: PostgresJobQueue.JobInitialization = .doNothing,
9093
revertMigrations: Bool = false,
9194
test: (JobQueue<PostgresJobQueue>) async throws -> T
9295
) async throws -> T {
@@ -110,6 +113,7 @@ final class JobsTests: XCTestCase {
110113
try await migrations.revert(client: client, groups: [.jobQueue], logger: logger, dryRun: false)
111114
}
112115
try await migrations.apply(client: client, groups: [.jobQueue], logger: logger, dryRun: false)
116+
try await jobQueue.queue.cleanup(failedJobs: failedJobsInitialization, processingJobs: processingJobsInitialization)
113117
let value = try await test(jobQueue)
114118
await serviceGroup.triggerGracefulShutdown()
115119
return value
@@ -134,13 +138,20 @@ final class JobsTests: XCTestCase {
134138
/// shutdown correctly
135139
@discardableResult public func testJobQueue<T>(
136140
numWorkers: Int,
137-
configuration: PostgresJobQueue.Configuration = .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
141+
failedJobsInitialization: PostgresJobQueue.JobInitialization = .remove,
142+
processingJobsInitialization: PostgresJobQueue.JobInitialization = .remove,
138143
revertMigrations: Bool = true,
139144
function: String = #function,
140145
test: (JobQueue<PostgresJobQueue>) async throws -> T
141146
) async throws -> T {
142-
let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration, function: function)
143-
return try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: revertMigrations, test: test)
147+
let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function)
148+
return try await self.testJobQueue(
149+
jobQueue: jobQueue,
150+
failedJobsInitialization: failedJobsInitialization,
151+
processingJobsInitialization: processingJobsInitialization,
152+
revertMigrations: revertMigrations,
153+
test: test
154+
)
144155
}
145156

146157
func testBasic() async throws {
@@ -368,14 +379,13 @@ final class JobsTests: XCTestCase {
368379
finished.store(true, ordering: .relaxed)
369380
}
370381
let jobQueue = try await createJobQueue(
371-
numWorkers: 1,
372-
configuration: .init(pendingJobsInitialization: .remove, failedJobsInitialization: .rerun)
382+
numWorkers: 1
373383
)
374384
jobQueue.registerJob(job)
375-
try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: true) { jobQueue in
376-
// stall to give onInit a chance to run, so it can remove any pendng jobs
377-
try await Task.sleep(for: .milliseconds(100))
378-
385+
try await self.testJobQueue(
386+
jobQueue: jobQueue,
387+
revertMigrations: true
388+
) { jobQueue in
379389
try await jobQueue.push(id: jobIdentifer, parameters: 0)
380390

381391
await self.wait(for: [failedExpectation], timeout: 10)
@@ -384,9 +394,9 @@ final class JobsTests: XCTestCase {
384394
XCTAssertFalse(finished.load(ordering: .relaxed))
385395
}
386396

387-
let jobQueue2 = try await createJobQueue(numWorkers: 1, configuration: .init(failedJobsInitialization: .rerun))
397+
let jobQueue2 = try await createJobQueue(numWorkers: 1)
388398
jobQueue2.registerJob(job)
389-
try await self.testJobQueue(jobQueue: jobQueue2) { _ in
399+
try await self.testJobQueue(jobQueue: jobQueue2, failedJobsInitialization: .rerun) { _ in
390400
await self.wait(for: [succeededExpectation], timeout: 10)
391401
XCTAssertTrue(finished.load(ordering: .relaxed))
392402
}
@@ -414,7 +424,6 @@ final class JobsTests: XCTestCase {
414424
.postgres(
415425
client: postgresClient,
416426
migrations: postgresMigrations,
417-
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
418427
logger: logger
419428
),
420429
numWorkers: 2,
@@ -425,7 +434,6 @@ final class JobsTests: XCTestCase {
425434
.postgres(
426435
client: postgresClient,
427436
migrations: postgresMigrations2,
428-
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
429437
logger: logger
430438
),
431439
numWorkers: 2,
@@ -447,6 +455,8 @@ final class JobsTests: XCTestCase {
447455
}
448456
try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)
449457
try await postgresMigrations2.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)
458+
try await jobQueue.queue.cleanup(failedJobs: .remove, processingJobs: .remove)
459+
try await jobQueue2.queue.cleanup(failedJobs: .remove, processingJobs: .remove)
450460
do {
451461
for i in 0..<200 {
452462
try await jobQueue.push(id: jobIdentifer, parameters: i)
@@ -475,7 +485,6 @@ final class JobsTests: XCTestCase {
475485
let jobQueue = await PostgresJobQueue(
476486
client: postgresClient,
477487
migrations: postgresMigrations,
478-
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
479488
logger: logger
480489
)
481490
try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)

0 commit comments

Comments
 (0)