diff --git a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift new file mode 100644 index 0000000..074ba4e --- /dev/null +++ b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift @@ -0,0 +1,161 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024-2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Jobs +import Logging +import NIOConcurrencyHelpers +import NIOCore +import PostgresMigrations +import PostgresNIO + +/// Parameters for Cleanup job +public struct JobCleanupParameters: Sendable & Codable { + let failedJobs: PostgresJobQueue.JobCleanup + let completedJobs: PostgresJobQueue.JobCleanup + let cancelledJobs: PostgresJobQueue.JobCleanup + + public init( + failedJobs: PostgresJobQueue.JobCleanup = .doNothing, + completedJobs: PostgresJobQueue.JobCleanup = .doNothing, + cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing + ) { + self.failedJobs = failedJobs + self.completedJobs = completedJobs + self.cancelledJobs = cancelledJobs + } +} + +extension PostgresJobQueue { + /// what to do with failed/processing jobs from last time queue was handled + public struct JobCleanup: Sendable, Codable { + enum RawValue: Codable { + case doNothing + case rerun + case remove(maxAge: Duration?) + } + let rawValue: RawValue + + public static var doNothing: Self { .init(rawValue: .doNothing) } + public static var rerun: Self { .init(rawValue: .rerun) } + public static var remove: Self { .init(rawValue: .remove(maxAge: nil)) } + public static func remove(maxAge: Duration) -> Self { .init(rawValue: .remove(maxAge: maxAge)) } + } + + /// clean up job name. + /// + /// Use this with the ``JobSchedule`` to schedule a cleanup of + /// failed, cancelled or completed jobs + public var cleanupJob: JobName { + .init("_Jobs_PostgresCleanup_\(self.configuration.queueName)") + } + + /// register clean up job on queue + func registerCleanupJob() { + self.registerJob( + JobDefinition(name: cleanupJob, parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in + try await self.cleanup( + failedJobs: parameters.failedJobs, + processingJobs: .doNothing, + pendingJobs: .doNothing, + completedJobs: parameters.completedJobs, + cancelledJobs: parameters.cancelledJobs, + logger: self.logger + ) + } + ) + } + + /// Cleanup job queues + /// + /// This function is used to re-run or delete jobs in a certain state. Failed jobs can be + /// pushed back into the pending queue to be re-run or removed. When called at startup in + /// theory no job should be set to processing, or set to pending but not in the queue. but if + /// your job server crashes these states are possible, so we also provide options to re-queue + /// these jobs so they are run again. + /// + /// The job queue needs to be running when you call cleanup. You can call `cleanup` with + /// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing + /// or pending jobs should only be done if you are certain there is nothing else processing + /// the job queue. + /// + /// - Parameters: + /// - failedJobs: What to do with jobs tagged as failed + /// - processingJobs: What to do with jobs tagged as processing + /// - pendingJobs: What to do with jobs tagged as pending + /// - completedJobs: What to do with jobs tagged as completed + /// - cancelledJobs: What to do with jobs tagged as cancelled + /// - logger: Optional logger to use when performing cleanup + /// - Throws: + public func cleanup( + failedJobs: JobCleanup = .doNothing, + processingJobs: JobCleanup = .doNothing, + pendingJobs: JobCleanup = .doNothing, + completedJobs: JobCleanup = .doNothing, + cancelledJobs: JobCleanup = .doNothing, + logger: Logger? = nil + ) async throws { + let logger = logger ?? self.logger + do { + /// wait for migrations to complete before running job queue cleanup + try await self.migrations.waitUntilCompleted() + _ = try await self.client.withTransaction(logger: logger) { connection in + self.logger.info("Cleanup Jobs") + try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection) + try await self.updateJobsOnInit(withStatus: .processing, onInit: processingJobs, connection: connection) + try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection) + try await self.updateJobsOnInit(withStatus: .completed, onInit: completedJobs, connection: connection) + try await self.updateJobsOnInit(withStatus: .cancelled, onInit: cancelledJobs, connection: connection) + } + } catch let error as PSQLError { + logger.error( + "JobQueue cleanup failed", + metadata: [ + "Error": "\(String(reflecting: error))" + ] + ) + throw error + } + } + + func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws { + switch onInit.rawValue { + case .remove(let olderThan): + let date: Date = + if let olderThan { + .now - Double(olderThan.components.seconds) + } else { + .distantFuture + } + try await connection.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(status) AND queue_name = \(configuration.queueName) + AND last_modified < \(date) + """, + logger: self.logger + ) + + case .rerun: + let jobs = try await getJobs(withStatus: status, connection: connection) + self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") + for jobID in jobs { + try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection) + } + + case .doNothing: + break + } + } +} diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 6dccfc7..dec3d84 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -45,12 +45,6 @@ import PostgresNIO public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue { public typealias JobID = UUID - /// what to do with failed/processing jobs from last time queue was handled - public enum JobCleanup: Sendable { - case doNothing - case rerun - case remove - } /// Job priority from lowest to highest public struct JobPriority: Equatable, Sendable { @@ -124,6 +118,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma case failed = 2 case cancelled = 3 case paused = 4 + case completed = 5 } /// Queue configuration @@ -132,6 +127,8 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma let pollTime: Duration /// Which Queue to push jobs into let queueName: String + /// Retention policy for jobs + let retentionPolicy: RetentionPolicy /// Initialize configuration /// - Parameters @@ -139,10 +136,12 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// - queueName: Name of queue we are handing public init( pollTime: Duration = .milliseconds(100), - queueName: String = "default" + queueName: String = "default", + retentionPolicy: RetentionPolicy = .init() ) { self.pollTime = pollTime self.queueName = queueName + self.retentionPolicy = retentionPolicy } } @@ -152,7 +151,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public let configuration: Configuration /// Logger used by queue public let logger: Logger - let migrations: DatabaseMigrations let isStopped: NIOLockedValueBox @@ -165,6 +163,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma self.isStopped = .init(false) self.migrations = migrations await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true) + self.registerCleanupJob() } public func onInit() async throws { @@ -184,7 +183,11 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public func cancel(jobID: JobID) async throws { try await self.client.withTransaction(logger: logger) { connection in try await deleteFromQueue(jobID: jobID, connection: connection) - try await delete(jobID: jobID) + if configuration.retentionPolicy.cancelled == .doNotRetain { + try await delete(jobID: jobID) + } else { + try await setStatus(jobID: jobID, status: .cancelled, connection: connection) + } } } @@ -223,53 +226,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma } } - /// Cleanup job queues - /// - /// This function is used to re-run or delete jobs in a certain state. Failed jobs can be - /// pushed back into the pending queue to be re-run or removed. When called at startup in - /// theory no job should be set to processing, or set to pending but not in the queue. but if - /// your job server crashes these states are possible, so we also provide options to re-queue - /// these jobs so they are run again. - /// - /// The job queue needs to be running when you call cleanup. You can call `cleanup` with - /// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing - /// or pending jobs should only be done if you are certain there is nothing else processing - /// the job queue. - /// - /// - Parameters: - /// - failedJobs: What to do with jobs tagged as failed - /// - processingJobs: What to do with jobs tagged as processing - /// - pendingJobs: What to do with jobs tagged as pending - /// - Throws: - public func cleanup( - failedJobs: JobCleanup = .doNothing, - processingJobs: JobCleanup = .doNothing, - pendingJobs: JobCleanup = .doNothing - ) async throws { - do { - /// wait for migrations to complete before running job queue cleanup - try await self.migrations.waitUntilCompleted() - _ = try await self.client.withConnection { connection in - self.logger.info("Update Jobs") - try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection) - try await self.updateJobsOnInit( - withStatus: .processing, - onInit: processingJobs, - connection: connection - ) - try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection) - } - } catch let error as PSQLError { - logger.error( - "JobQueue initialization failed", - metadata: [ - "Error": "\(String(reflecting: error))" - ] - ) - throw error - } - } - /// Register job /// - Parameters: /// - job: Job Definition @@ -290,15 +246,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// Retry an existing Job /// - Parameters - /// - id: Job instance ID + /// - jobID: Job instance ID /// - jobRequest: Job Request /// - options: Job retry options - public func retry(_ id: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { + public func retry(_ jobID: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in - try await self.updateJob(id: id, buffer: buffer, connection: connection) + try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection) try await self.addToQueue( - jobID: id, + jobID: jobID, queueName: configuration.queueName, options: .init(delayUntil: options.delayUntil), connection: connection @@ -308,12 +264,20 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// This is called to say job has finished processing and it can be deleted public func finished(jobID: JobID) async throws { - try await self.delete(jobID: jobID) + if configuration.retentionPolicy.completed == .doNotRetain { + try await self.delete(jobID: jobID) + } else { + try await self.setStatus(jobID: jobID, status: .completed) + } } /// This is called to say job has failed to run and should be put aside public func failed(jobID: JobID, error: Error) async throws { - try await self.setStatus(jobID: jobID, status: .failed) + if configuration.retentionPolicy.failed == .doNotRetain { + try await self.delete(jobID: jobID) + } else { + try await self.setStatus(jobID: jobID, status: .failed) + } } /// stop serving jobs @@ -451,15 +415,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma logger: self.logger ) } - // TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged? - func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { + + func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { try await connection.query( """ UPDATE swift_jobs.jobs SET job = \(buffer), last_modified = \(Date.now), status = \(Status.failed) - WHERE id = \(id) AND queue_name = \(configuration.queueName) + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) """, logger: self.logger ) @@ -538,27 +502,21 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma return jobs } - func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws { - switch onInit { - case .remove: - try await connection.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(status) AND queue_name = \(configuration.queueName) - """, - logger: self.logger - ) - - case .rerun: - let jobs = try await getJobs(withStatus: status) - self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") - for jobID in jobs { - try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection) - } - - case .doNothing: - break + func getJobs(withStatus status: Status, connection: PostgresConnection) async throws -> [JobID] { + let stream = try await connection.query( + """ + SELECT + id + FROM swift_jobs.jobs + WHERE status = \(status) AND queue_name = \(configuration.queueName) + """, + logger: self.logger + ) + var jobs: [JobID] = [] + for try await id in stream.decode(JobID.self, context: .default) { + jobs.append(id) } + return jobs } let jobRegistry: JobRegistry diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift new file mode 100644 index 0000000..c91aecb --- /dev/null +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -0,0 +1,51 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation + +extension PostgresJobQueue { + /// Data rentension policy + public struct RetentionPolicy: Sendable { + /// Data retention policy + public struct RetainData: Equatable, Sendable { + enum Policy { + case retain + case doNotRetain + } + + let rawValue: Policy + /// Retain task + public static var retain: RetainData { RetainData(rawValue: .retain) } + /// Never retain any data + public static var doNotRetain: RetainData { RetainData(rawValue: .doNotRetain) } + } + + /// Jobs with status cancelled + public var cancelled: RetainData + /// Jobs with status completed + public var completed: RetainData + /// Jobs with status failed + public var failed: RetainData + + public init( + cancelled: RetainData = .doNotRetain, + completed: RetainData = .doNotRetain, + failed: RetainData = .retain + ) { + self.cancelled = cancelled + self.completed = completed + self.failed = failed + } + } +} diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index f768d8d..6b6d472 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -178,10 +178,15 @@ final class JobsTests: XCTestCase { processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove, pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove, revertMigrations: Bool = true, + configuration: PostgresJobQueue.Configuration = .init(), function: String = #function, test: (JobQueue) async throws -> T ) async throws -> T { - let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function) + let jobQueue = try await self.createJobQueue( + numWorkers: numWorkers, + configuration: configuration, + function: function + ) return try await self.testJobQueue( jobQueue: jobQueue, failedJobsInitialization: failedJobsInitialization, @@ -490,7 +495,16 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - try await self.testJobQueue(numWorkers: 4) { jobQueue in + try await self.testJobQueue( + numWorkers: 4, + configuration: .init( + retentionPolicy: .init( + cancelled: .doNotRetain, + completed: .doNotRetain, + failed: .doNotRetain + ) + ) + ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { _, _ in expectation.fulfill() try await Task.sleep(for: .milliseconds(1000)) @@ -769,7 +783,17 @@ final class JobsTests: XCTestCase { let didRunCancelledJob: NIOLockedValueBox = .init(false) let didRunNoneCancelledJob: NIOLockedValueBox = .init(false) - let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function) + let jobQueue = try await self.createJobQueue( + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .doNotRetain, + completed: .doNotRetain, + failed: .doNotRetain + ) + ), + function: #function + ) try await testPriorityJobQueue(jobQueue: jobQueue) { queue in queue.registerJob(parameters: TestParameters.self) { parameters, context in @@ -827,4 +851,149 @@ final class JobsTests: XCTestCase { XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false) XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true) } + + func testCompletedJobRetention() async throws { + struct TestParameters: JobParameters { + static let jobName = "testCompletedJobRetention" + let value: Int + } + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) + try await self.testJobQueue( + numWorkers: 1, + configuration: .init(retentionPolicy: .init(completed: .retain)) + ) { jobQueue in + jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in + context.logger.info("Parameters=\(parameters.value)") + expectation.fulfill() + } + try await jobQueue.push(TestParameters(value: 1)) + try await jobQueue.push(TestParameters(value: 2)) + try await jobQueue.push(TestParameters(value: 3)) + + await fulfillment(of: [expectation], timeout: 10) + try await Task.sleep(for: .milliseconds(200)) + + let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) + XCTAssertEqual(completedJobs.count, 3) + try await jobQueue.queue.cleanup(completedJobs: .remove(maxAge: .seconds(10))) + XCTAssertEqual(completedJobs.count, 3) + try await jobQueue.queue.cleanup(completedJobs: .remove(maxAge: .seconds(0))) + let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .completed) + XCTAssertEqual(zeroJobs.count, 0) + } + } + + func testCancelledJobRetention() async throws { + let jobQueue = try await self.createJobQueue( + numWorkers: 1, + configuration: .init(retentionPolicy: .init(cancelled: .retain)) + ) + let jobName = JobName("testCancelledJobRetention") + jobQueue.registerJob(name: jobName) { _, _ in } + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + // run postgres client + await jobQueue.queue.client.run() + } + try await jobQueue.queue.migrations.apply(client: jobQueue.queue.client, logger: jobQueue.logger, dryRun: false) + + let jobId = try await jobQueue.push(jobName, parameters: 1) + let jobId2 = try await jobQueue.push(jobName, parameters: 2) + + try await jobQueue.cancelJob(jobID: jobId) + try await jobQueue.cancelJob(jobID: jobId2) + + var cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .cancelled) + XCTAssertEqual(cancelledJobs.count, 2) + try await jobQueue.queue.cleanup(cancelledJobs: .remove(maxAge: .seconds(0))) + cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .cancelled) + XCTAssertEqual(cancelledJobs.count, 0) + + group.cancelAll() + } + } + + func testCleanupProcessingJobs() async throws { + let jobQueue = try await self.createJobQueue(numWorkers: 1) + let jobName = JobName("testCancelledJobRetention") + jobQueue.registerJob(name: jobName) { _, _ in } + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + // run postgres client + await jobQueue.queue.client.run() + } + try await jobQueue.queue.migrations.apply(client: jobQueue.queue.client, logger: jobQueue.logger, dryRun: false) + + let jobID = try await jobQueue.push(jobName, parameters: 1) + let job = try await jobQueue.queue.popFirst() + XCTAssertEqual(jobID, job?.id) + _ = try await jobQueue.push(jobName, parameters: 1) + _ = try await jobQueue.queue.popFirst() + + var processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing) + XCTAssertEqual(processingJobs.count, 2) + + try await jobQueue.queue.cleanup(processingJobs: .remove) + + processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing) + XCTAssertEqual(processingJobs.count, 0) + + group.cancelAll() + } + } + + func testCleanupJob() async throws { + try await self.testJobQueue( + numWorkers: 1, + configuration: .init(retentionPolicy: .init(failed: .retain)) + ) { jobQueue in + try await self.testJobQueue( + numWorkers: 1, + configuration: .init( + queueName: "SecondQueue", + retentionPolicy: .init(failed: .retain) + ) + ) { jobQueue2 in + let (stream, cont) = AsyncStream.makeStream(of: Void.self) + var iterator = stream.makeAsyncIterator() + struct TempJob: Sendable & Codable {} + let barrierJobName = JobName("barrier") + jobQueue.registerJob(name: "testCleanupJob", parameters: String.self) { parameters, context in + throw CancellationError() + } + jobQueue.registerJob(name: barrierJobName, parameters: TempJob.self) { parameters, context in + cont.yield() + } + jobQueue2.registerJob(name: "testCleanupJob", parameters: String.self) { parameters, context in + throw CancellationError() + } + jobQueue2.registerJob(name: barrierJobName, parameters: TempJob.self) { parameters, context in + cont.yield() + } + try await jobQueue.push("testCleanupJob", parameters: "1") + try await jobQueue.push("testCleanupJob", parameters: "2") + try await jobQueue.push("testCleanupJob", parameters: "3") + try await jobQueue.push(barrierJobName, parameters: .init()) + try await jobQueue2.push("testCleanupJob", parameters: "1") + try await jobQueue2.push(barrierJobName, parameters: .init()) + + await iterator.next() + await iterator.next() + + let failedJob = try await jobQueue.queue.getJobs(withStatus: .failed) + XCTAssertEqual(failedJob.count, 3) + try await jobQueue.push(jobQueue.queue.cleanupJob, parameters: .init(failedJobs: .remove)) + try await jobQueue.push(barrierJobName, parameters: .init()) + + await iterator.next() + + let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .failed) + XCTAssertEqual(zeroJobs.count, 0) + let jobCount2 = try await jobQueue2.queue.getJobs(withStatus: .failed) + XCTAssertEqual(jobCount2.count, 1) + } + } + } }