Skip to content

Commit 3c4b16c

Browse files
authored
Move job queue cleanup to separate function (#12)
* Move job queue cleanup to separate function * Add comments, more test cleanup * Re-instate onInit so migrations are completed before processing the job queue
1 parent a570ffc commit 3c4b16c

File tree

3 files changed

+64
-39
lines changed

3 files changed

+64
-39
lines changed

Sources/JobsPostgres/Migrations/UpdateJobDelay.swift

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,3 @@
1-
//
2-
// UpdateJobDelay.swift
3-
// swift-jobs-postgres
4-
//
5-
// Created by Stevenson Michel on 2/17/25.
6-
//
7-
81
//===----------------------------------------------------------------------===//
92
//
103
// This source file is part of the Hummingbird server framework project

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public final class PostgresJobQueue: JobQueueDriver {
4646
public typealias JobID = UUID
4747

4848
/// what to do with failed/processing jobs from last time queue was handled
49-
public enum JobInitialization: Sendable {
49+
public enum JobCleanup: Sendable {
5050
case doNothing
5151
case rerun
5252
case remove
@@ -73,20 +73,14 @@ public final class PostgresJobQueue: JobQueueDriver {
7373

7474
/// Queue configuration
7575
public struct Configuration: Sendable {
76-
let pendingJobsInitialization: JobInitialization
77-
let failedJobsInitialization: JobInitialization
78-
let processingJobsInitialization: JobInitialization
76+
/// Queue poll time to wait if queue empties
7977
let pollTime: Duration
8078

79+
/// Initialize configuration
80+
/// - Parameter pollTime: Queue poll time to wait if queue empties
8181
public init(
82-
pendingJobsInitialization: JobInitialization = .doNothing,
83-
failedJobsInitialization: JobInitialization = .rerun,
84-
processingJobsInitialization: JobInitialization = .rerun,
8582
pollTime: Duration = .milliseconds(100)
8683
) {
87-
self.pendingJobsInitialization = pendingJobsInitialization
88-
self.failedJobsInitialization = failedJobsInitialization
89-
self.processingJobsInitialization = processingJobsInitialization
9084
self.pollTime = pollTime
9185
}
9286
}
@@ -115,20 +109,47 @@ public final class PostgresJobQueue: JobQueueDriver {
115109
await migrations.add(UpdateJobDelay())
116110
}
117111

118-
/// Run on initialization of the job queue
119112
public func onInit() async throws {
113+
self.logger.info("Waiting for JobQueue migrations")
114+
/// Need migrations to have completed before job queue processing can start
115+
try await self.migrations.waitUntilCompleted()
116+
}
117+
118+
/// Cleanup job queues
119+
///
120+
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
121+
/// pushed back into the pending queue to be re-run or removed. When called at startup in
122+
/// theory no job should be set to processing, or set to pending but not in the queue. but if
123+
/// your job server crashes these states are possible, so we also provide options to re-queue
124+
/// these jobs so they are run again.
125+
///
126+
/// The job queue needs to be running when you call cleanup. You can call `cleanup` with
127+
/// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing
128+
/// or pending jobs should only be done if you are certain there is nothing else processing
129+
/// the job queue.
130+
///
131+
/// - Parameters:
132+
/// - failedJobs: What to do with jobs tagged as failed
133+
/// - processingJobs: What to do with jobs tagged as processing
134+
/// - pendingJobs: What to do with jobs tagged as pending
135+
/// - Throws:
136+
public func cleanup(
137+
failedJobs: JobCleanup = .doNothing,
138+
processingJobs: JobCleanup = .doNothing,
139+
pendingJobs: JobCleanup = .doNothing
140+
) async throws {
120141
do {
121-
self.logger.info("Waiting for JobQueue migrations")
142+
/// wait for migrations to complete before running job queue cleanup
122143
try await self.migrations.waitUntilCompleted()
123144
_ = try await self.client.withConnection { connection in
124-
self.logger.info("Update Jobs at initialization")
125-
try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection)
145+
self.logger.info("Update Jobs")
146+
try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection)
126147
try await self.updateJobsOnInit(
127148
withStatus: .processing,
128-
onInit: self.configuration.processingJobsInitialization,
149+
onInit: processingJobs,
129150
connection: connection
130151
)
131-
try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection)
152+
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
132153
}
133154
} catch let error as PSQLError {
134155
logger.error(
@@ -321,7 +342,7 @@ public final class PostgresJobQueue: JobQueueDriver {
321342
return jobs
322343
}
323344

324-
func updateJobsOnInit(withStatus status: Status, onInit: JobInitialization, connection: PostgresConnection) async throws {
345+
func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws {
325346
switch onInit {
326347
case .remove:
327348
try await connection.query(

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ final class JobsTests: XCTestCase {
5151

5252
func createJobQueue(
5353
numWorkers: Int,
54-
configuration: PostgresJobQueue.Configuration,
54+
configuration: PostgresJobQueue.Configuration = .init(),
5555
function: String = #function
5656
) async throws -> JobQueue<PostgresJobQueue> {
5757
let logger = {
@@ -87,6 +87,9 @@ final class JobsTests: XCTestCase {
8787
/// shutdown correctly
8888
@discardableResult public func testJobQueue<T>(
8989
jobQueue: JobQueue<PostgresJobQueue>,
90+
failedJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
91+
processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
92+
pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
9093
revertMigrations: Bool = false,
9194
test: (JobQueue<PostgresJobQueue>) async throws -> T
9295
) async throws -> T {
@@ -110,6 +113,7 @@ final class JobsTests: XCTestCase {
110113
try await migrations.revert(client: client, groups: [.jobQueue], logger: logger, dryRun: false)
111114
}
112115
try await migrations.apply(client: client, groups: [.jobQueue], logger: logger, dryRun: false)
116+
try await jobQueue.queue.cleanup(failedJobs: failedJobsInitialization, processingJobs: processingJobsInitialization)
113117
let value = try await test(jobQueue)
114118
await serviceGroup.triggerGracefulShutdown()
115119
return value
@@ -134,13 +138,22 @@ final class JobsTests: XCTestCase {
134138
/// shutdown correctly
135139
@discardableResult public func testJobQueue<T>(
136140
numWorkers: Int,
137-
configuration: PostgresJobQueue.Configuration = .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
141+
failedJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
142+
processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
143+
pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
138144
revertMigrations: Bool = true,
139145
function: String = #function,
140146
test: (JobQueue<PostgresJobQueue>) async throws -> T
141147
) async throws -> T {
142-
let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration, function: function)
143-
return try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: revertMigrations, test: test)
148+
let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function)
149+
return try await self.testJobQueue(
150+
jobQueue: jobQueue,
151+
failedJobsInitialization: failedJobsInitialization,
152+
processingJobsInitialization: processingJobsInitialization,
153+
pendingJobsInitialization: pendingJobsInitialization,
154+
revertMigrations: revertMigrations,
155+
test: test
156+
)
144157
}
145158

146159
func testBasic() async throws {
@@ -368,14 +381,13 @@ final class JobsTests: XCTestCase {
368381
finished.store(true, ordering: .relaxed)
369382
}
370383
let jobQueue = try await createJobQueue(
371-
numWorkers: 1,
372-
configuration: .init(pendingJobsInitialization: .remove, failedJobsInitialization: .rerun)
384+
numWorkers: 1
373385
)
374386
jobQueue.registerJob(job)
375-
try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: true) { jobQueue in
376-
// stall to give onInit a chance to run, so it can remove any pendng jobs
377-
try await Task.sleep(for: .milliseconds(100))
378-
387+
try await self.testJobQueue(
388+
jobQueue: jobQueue,
389+
revertMigrations: true
390+
) { jobQueue in
379391
try await jobQueue.push(id: jobIdentifer, parameters: 0)
380392

381393
await self.wait(for: [failedExpectation], timeout: 10)
@@ -384,9 +396,9 @@ final class JobsTests: XCTestCase {
384396
XCTAssertFalse(finished.load(ordering: .relaxed))
385397
}
386398

387-
let jobQueue2 = try await createJobQueue(numWorkers: 1, configuration: .init(failedJobsInitialization: .rerun))
399+
let jobQueue2 = try await createJobQueue(numWorkers: 1)
388400
jobQueue2.registerJob(job)
389-
try await self.testJobQueue(jobQueue: jobQueue2) { _ in
401+
try await self.testJobQueue(jobQueue: jobQueue2, failedJobsInitialization: .rerun) { _ in
390402
await self.wait(for: [succeededExpectation], timeout: 10)
391403
XCTAssertTrue(finished.load(ordering: .relaxed))
392404
}
@@ -414,7 +426,6 @@ final class JobsTests: XCTestCase {
414426
.postgres(
415427
client: postgresClient,
416428
migrations: postgresMigrations,
417-
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
418429
logger: logger
419430
),
420431
numWorkers: 2,
@@ -425,7 +436,6 @@ final class JobsTests: XCTestCase {
425436
.postgres(
426437
client: postgresClient,
427438
migrations: postgresMigrations2,
428-
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
429439
logger: logger
430440
),
431441
numWorkers: 2,
@@ -447,6 +457,8 @@ final class JobsTests: XCTestCase {
447457
}
448458
try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)
449459
try await postgresMigrations2.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)
460+
try await jobQueue.queue.cleanup(failedJobs: .remove, processingJobs: .remove)
461+
try await jobQueue2.queue.cleanup(failedJobs: .remove, processingJobs: .remove)
450462
do {
451463
for i in 0..<200 {
452464
try await jobQueue.push(id: jobIdentifer, parameters: i)
@@ -475,7 +487,6 @@ final class JobsTests: XCTestCase {
475487
let jobQueue = await PostgresJobQueue(
476488
client: postgresClient,
477489
migrations: postgresMigrations,
478-
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
479490
logger: logger
480491
)
481492
try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)

0 commit comments

Comments
 (0)