Skip to content

Commit 2933e23

Browse files
Job retention and cleanup (#35)
* Job retention * Job retention * Swift format * Addressing PR comments * Update PostgresJobsQueue.swift * fixing the failing test * Adding job pruner and instruction on how to register and run the job on a schedule * increase time amount for test job * Update JobsTests.swift * Update JobsTests.swift * up concurrency to 3 to match number of job. There seems to be a timing issue * Wait for 200 milliseconds before checking the job count. * Merge retention policy and cleanup * Register cleanup job This currently assumes there is only one postgres job queue. Need the changes splitting job id from parameters to implement for every queue * Create separate cleanup job for each queue * Use swift-jobs main branch * .doNotRetain * Move JobCleanup between files * Add testCancelledJobRetention * Move Retention policy inside PostgresJobQueue * Add tests, and new version of getJobs() --------- Co-authored-by: Stevenson Michel <130018170+thoven87@users.noreply.github.com>
1 parent 857e17b commit 2933e23

File tree

4 files changed

+427
-88
lines changed

4 files changed

+427
-88
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Hummingbird server framework project
4+
//
5+
// Copyright (c) 2024-2025 the Hummingbird authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Foundation
16+
import Jobs
17+
import Logging
18+
import NIOConcurrencyHelpers
19+
import NIOCore
20+
import PostgresMigrations
21+
import PostgresNIO
22+
23+
/// Parameters for Cleanup job
24+
public struct JobCleanupParameters: Sendable & Codable {
25+
let failedJobs: PostgresJobQueue.JobCleanup
26+
let completedJobs: PostgresJobQueue.JobCleanup
27+
let cancelledJobs: PostgresJobQueue.JobCleanup
28+
29+
public init(
30+
failedJobs: PostgresJobQueue.JobCleanup = .doNothing,
31+
completedJobs: PostgresJobQueue.JobCleanup = .doNothing,
32+
cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing
33+
) {
34+
self.failedJobs = failedJobs
35+
self.completedJobs = completedJobs
36+
self.cancelledJobs = cancelledJobs
37+
}
38+
}
39+
40+
extension PostgresJobQueue {
41+
/// what to do with failed/processing jobs from last time queue was handled
42+
public struct JobCleanup: Sendable, Codable {
43+
enum RawValue: Codable {
44+
case doNothing
45+
case rerun
46+
case remove(maxAge: Duration?)
47+
}
48+
let rawValue: RawValue
49+
50+
public static var doNothing: Self { .init(rawValue: .doNothing) }
51+
public static var rerun: Self { .init(rawValue: .rerun) }
52+
public static var remove: Self { .init(rawValue: .remove(maxAge: nil)) }
53+
public static func remove(maxAge: Duration) -> Self { .init(rawValue: .remove(maxAge: maxAge)) }
54+
}
55+
56+
/// clean up job name.
57+
///
58+
/// Use this with the ``JobSchedule`` to schedule a cleanup of
59+
/// failed, cancelled or completed jobs
60+
public var cleanupJob: JobName<JobCleanupParameters> {
61+
.init("_Jobs_PostgresCleanup_\(self.configuration.queueName)")
62+
}
63+
64+
/// register clean up job on queue
65+
func registerCleanupJob() {
66+
self.registerJob(
67+
JobDefinition(name: cleanupJob, parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in
68+
try await self.cleanup(
69+
failedJobs: parameters.failedJobs,
70+
processingJobs: .doNothing,
71+
pendingJobs: .doNothing,
72+
completedJobs: parameters.completedJobs,
73+
cancelledJobs: parameters.cancelledJobs,
74+
logger: self.logger
75+
)
76+
}
77+
)
78+
}
79+
80+
/// Cleanup job queues
81+
///
82+
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
83+
/// pushed back into the pending queue to be re-run or removed. When called at startup in
84+
/// theory no job should be set to processing, or set to pending but not in the queue. but if
85+
/// your job server crashes these states are possible, so we also provide options to re-queue
86+
/// these jobs so they are run again.
87+
///
88+
/// The job queue needs to be running when you call cleanup. You can call `cleanup` with
89+
/// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing
90+
/// or pending jobs should only be done if you are certain there is nothing else processing
91+
/// the job queue.
92+
///
93+
/// - Parameters:
94+
/// - failedJobs: What to do with jobs tagged as failed
95+
/// - processingJobs: What to do with jobs tagged as processing
96+
/// - pendingJobs: What to do with jobs tagged as pending
97+
/// - completedJobs: What to do with jobs tagged as completed
98+
/// - cancelledJobs: What to do with jobs tagged as cancelled
99+
/// - logger: Optional logger to use when performing cleanup
100+
/// - Throws:
101+
public func cleanup(
102+
failedJobs: JobCleanup = .doNothing,
103+
processingJobs: JobCleanup = .doNothing,
104+
pendingJobs: JobCleanup = .doNothing,
105+
completedJobs: JobCleanup = .doNothing,
106+
cancelledJobs: JobCleanup = .doNothing,
107+
logger: Logger? = nil
108+
) async throws {
109+
let logger = logger ?? self.logger
110+
do {
111+
/// wait for migrations to complete before running job queue cleanup
112+
try await self.migrations.waitUntilCompleted()
113+
_ = try await self.client.withTransaction(logger: logger) { connection in
114+
self.logger.info("Cleanup Jobs")
115+
try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection)
116+
try await self.updateJobsOnInit(withStatus: .processing, onInit: processingJobs, connection: connection)
117+
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
118+
try await self.updateJobsOnInit(withStatus: .completed, onInit: completedJobs, connection: connection)
119+
try await self.updateJobsOnInit(withStatus: .cancelled, onInit: cancelledJobs, connection: connection)
120+
}
121+
} catch let error as PSQLError {
122+
logger.error(
123+
"JobQueue cleanup failed",
124+
metadata: [
125+
"Error": "\(String(reflecting: error))"
126+
]
127+
)
128+
throw error
129+
}
130+
}
131+
132+
func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws {
133+
switch onInit.rawValue {
134+
case .remove(let olderThan):
135+
let date: Date =
136+
if let olderThan {
137+
.now - Double(olderThan.components.seconds)
138+
} else {
139+
.distantFuture
140+
}
141+
try await connection.query(
142+
"""
143+
DELETE FROM swift_jobs.jobs
144+
WHERE status = \(status) AND queue_name = \(configuration.queueName)
145+
AND last_modified < \(date)
146+
""",
147+
logger: self.logger
148+
)
149+
150+
case .rerun:
151+
let jobs = try await getJobs(withStatus: status, connection: connection)
152+
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
153+
for jobID in jobs {
154+
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection)
155+
}
156+
157+
case .doNothing:
158+
break
159+
}
160+
}
161+
}

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 43 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,6 @@ import PostgresNIO
4545
public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue {
4646

4747
public typealias JobID = UUID
48-
/// what to do with failed/processing jobs from last time queue was handled
49-
public enum JobCleanup: Sendable {
50-
case doNothing
51-
case rerun
52-
case remove
53-
}
5448

5549
/// Job priority from lowest to highest
5650
public struct JobPriority: Equatable, Sendable {
@@ -124,6 +118,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
124118
case failed = 2
125119
case cancelled = 3
126120
case paused = 4
121+
case completed = 5
127122
}
128123

129124
/// Queue configuration
@@ -132,17 +127,21 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
132127
let pollTime: Duration
133128
/// Which Queue to push jobs into
134129
let queueName: String
130+
/// Retention policy for jobs
131+
let retentionPolicy: RetentionPolicy
135132

136133
/// Initialize configuration
137134
/// - Parameters
138135
/// - pollTime: Queue poll time to wait if queue empties
139136
/// - queueName: Name of queue we are handing
140137
public init(
141138
pollTime: Duration = .milliseconds(100),
142-
queueName: String = "default"
139+
queueName: String = "default",
140+
retentionPolicy: RetentionPolicy = .init()
143141
) {
144142
self.pollTime = pollTime
145143
self.queueName = queueName
144+
self.retentionPolicy = retentionPolicy
146145
}
147146
}
148147

@@ -152,7 +151,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
152151
public let configuration: Configuration
153152
/// Logger used by queue
154153
public let logger: Logger
155-
156154
let migrations: DatabaseMigrations
157155
let isStopped: NIOLockedValueBox<Bool>
158156

@@ -165,6 +163,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
165163
self.isStopped = .init(false)
166164
self.migrations = migrations
167165
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
166+
self.registerCleanupJob()
168167
}
169168

170169
public func onInit() async throws {
@@ -184,7 +183,11 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
184183
public func cancel(jobID: JobID) async throws {
185184
try await self.client.withTransaction(logger: logger) { connection in
186185
try await deleteFromQueue(jobID: jobID, connection: connection)
187-
try await delete(jobID: jobID)
186+
if configuration.retentionPolicy.cancelled == .doNotRetain {
187+
try await delete(jobID: jobID)
188+
} else {
189+
try await setStatus(jobID: jobID, status: .cancelled, connection: connection)
190+
}
188191
}
189192
}
190193

@@ -223,53 +226,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
223226
}
224227
}
225228

226-
/// Cleanup job queues
227-
///
228-
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
229-
/// pushed back into the pending queue to be re-run or removed. When called at startup in
230-
/// theory no job should be set to processing, or set to pending but not in the queue. but if
231-
/// your job server crashes these states are possible, so we also provide options to re-queue
232-
/// these jobs so they are run again.
233-
///
234-
/// The job queue needs to be running when you call cleanup. You can call `cleanup` with
235-
/// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing
236-
/// or pending jobs should only be done if you are certain there is nothing else processing
237-
/// the job queue.
238-
///
239-
/// - Parameters:
240-
/// - failedJobs: What to do with jobs tagged as failed
241-
/// - processingJobs: What to do with jobs tagged as processing
242-
/// - pendingJobs: What to do with jobs tagged as pending
243-
/// - Throws:
244-
public func cleanup(
245-
failedJobs: JobCleanup = .doNothing,
246-
processingJobs: JobCleanup = .doNothing,
247-
pendingJobs: JobCleanup = .doNothing
248-
) async throws {
249-
do {
250-
/// wait for migrations to complete before running job queue cleanup
251-
try await self.migrations.waitUntilCompleted()
252-
_ = try await self.client.withConnection { connection in
253-
self.logger.info("Update Jobs")
254-
try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection)
255-
try await self.updateJobsOnInit(
256-
withStatus: .processing,
257-
onInit: processingJobs,
258-
connection: connection
259-
)
260-
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
261-
}
262-
} catch let error as PSQLError {
263-
logger.error(
264-
"JobQueue initialization failed",
265-
metadata: [
266-
"Error": "\(String(reflecting: error))"
267-
]
268-
)
269-
throw error
270-
}
271-
}
272-
273229
/// Register job
274230
/// - Parameters:
275231
/// - job: Job Definition
@@ -290,15 +246,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
290246

291247
/// Retry an existing Job
292248
/// - Parameters
293-
/// - id: Job instance ID
249+
/// - jobID: Job instance ID
294250
/// - jobRequest: Job Request
295251
/// - options: Job retry options
296-
public func retry<Parameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
252+
public func retry<Parameters>(_ jobID: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
297253
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
298254
try await self.client.withTransaction(logger: self.logger) { connection in
299-
try await self.updateJob(id: id, buffer: buffer, connection: connection)
255+
try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection)
300256
try await self.addToQueue(
301-
jobID: id,
257+
jobID: jobID,
302258
queueName: configuration.queueName,
303259
options: .init(delayUntil: options.delayUntil),
304260
connection: connection
@@ -308,12 +264,20 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
308264

309265
/// This is called to say job has finished processing and it can be deleted
310266
public func finished(jobID: JobID) async throws {
311-
try await self.delete(jobID: jobID)
267+
if configuration.retentionPolicy.completed == .doNotRetain {
268+
try await self.delete(jobID: jobID)
269+
} else {
270+
try await self.setStatus(jobID: jobID, status: .completed)
271+
}
312272
}
313273

314274
/// This is called to say job has failed to run and should be put aside
315275
public func failed(jobID: JobID, error: Error) async throws {
316-
try await self.setStatus(jobID: jobID, status: .failed)
276+
if configuration.retentionPolicy.failed == .doNotRetain {
277+
try await self.delete(jobID: jobID)
278+
} else {
279+
try await self.setStatus(jobID: jobID, status: .failed)
280+
}
317281
}
318282

319283
/// stop serving jobs
@@ -451,15 +415,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
451415
logger: self.logger
452416
)
453417
}
454-
// TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
455-
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
418+
419+
func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
456420
try await connection.query(
457421
"""
458422
UPDATE swift_jobs.jobs
459423
SET job = \(buffer),
460424
last_modified = \(Date.now),
461425
status = \(Status.failed)
462-
WHERE id = \(id) AND queue_name = \(configuration.queueName)
426+
WHERE id = \(jobID) AND queue_name = \(configuration.queueName)
463427
""",
464428
logger: self.logger
465429
)
@@ -538,27 +502,21 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
538502
return jobs
539503
}
540504

541-
func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws {
542-
switch onInit {
543-
case .remove:
544-
try await connection.query(
545-
"""
546-
DELETE FROM swift_jobs.jobs
547-
WHERE status = \(status) AND queue_name = \(configuration.queueName)
548-
""",
549-
logger: self.logger
550-
)
551-
552-
case .rerun:
553-
let jobs = try await getJobs(withStatus: status)
554-
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
555-
for jobID in jobs {
556-
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection)
557-
}
558-
559-
case .doNothing:
560-
break
505+
func getJobs(withStatus status: Status, connection: PostgresConnection) async throws -> [JobID] {
506+
let stream = try await connection.query(
507+
"""
508+
SELECT
509+
id
510+
FROM swift_jobs.jobs
511+
WHERE status = \(status) AND queue_name = \(configuration.queueName)
512+
""",
513+
logger: self.logger
514+
)
515+
var jobs: [JobID] = []
516+
for try await id in stream.decode(JobID.self, context: .default) {
517+
jobs.append(id)
561518
}
519+
return jobs
562520
}
563521

564522
let jobRegistry: JobRegistry

0 commit comments

Comments
 (0)