diff --git a/Sources/JobsPostgres/JobPruner.swift b/Sources/JobsPostgres/JobPruner.swift new file mode 100644 index 0000000..ced8663 --- /dev/null +++ b/Sources/JobsPostgres/JobPruner.swift @@ -0,0 +1,19 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Jobs + +public struct JobPruner: JobParameters { + static public var jobName: String { "JobPruner" } +} diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 358c499..24b1f2b 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -124,6 +124,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma case failed = 2 case cancelled = 3 case paused = 4 + case completed = 5 } /// Queue configuration @@ -132,6 +133,8 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma let pollTime: Duration /// Which Queue to push jobs into let queueName: String + /// Retention policy for jobs + let retentionPolicy: RetentionPolicy /// Initialize configuration /// - Parameters @@ -139,10 +142,12 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// - queueName: Name of queue we are handing public init( pollTime: Duration = .milliseconds(100), - queueName: String = "default" + queueName: String = "default", + retentionPolicy: RetentionPolicy = .init() ) { self.pollTime = pollTime self.queueName = queueName + self.retentionPolicy = retentionPolicy } } @@ -184,7 +189,11 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public func cancel(jobID: JobID) async throws { try await self.client.withTransaction(logger: logger) { connection in try await deleteFromQueue(jobID: jobID, connection: connection) - try await delete(jobID: jobID) + if configuration.retentionPolicy.cancelled == .never { + try await delete(jobID: jobID) + } else { + try await setStatus(jobID: jobID, status: .cancelled, connection: connection) + } } } @@ -290,15 +299,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// Retry an existing Job /// - Parameters - /// - id: Job instance ID + /// - jobID: Job instance ID /// - jobRequest: Job Request /// - options: Job retry options - public func retry(_ id: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { + public func retry(_ jobID: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in - try await self.updateJob(id: id, buffer: buffer, connection: connection) + try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection) try await self.addToQueue( - jobID: id, + jobID: jobID, queueName: configuration.queueName, options: .init(delayUntil: options.delayUntil), connection: connection @@ -308,12 +317,20 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// This is called to say job has finished processing and it can be deleted public func finished(jobID: JobID) async throws { - try await self.delete(jobID: jobID) + if configuration.retentionPolicy.completed == .never { + try await self.delete(jobID: jobID) + } else { + try await self.setStatus(jobID: jobID, status: .completed) + } } /// This is called to say job has failed to run and should be put aside public func failed(jobID: JobID, error: Error) async throws { - try await self.setStatus(jobID: jobID, status: .failed) + if configuration.retentionPolicy.failed == .never { + try await self.delete(jobID: jobID) + } else { + try await self.setStatus(jobID: jobID, status: .failed) + } } /// stop serving jobs @@ -451,15 +468,60 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma logger: self.logger ) } - // TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged? - func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { + + /// This menthod shall be called with the JobPruner after registration as follow + /// someJobQueue.registerJobparameters: JobPruner.self) { _, _ in + /// try await someJobQueue.processDataRetentionPolicy() + /// } + /// let jobScheddule = JobSchedule([ .init(job: JobPruner(), schedule: .everyHour()) ]) + public func processDataRetentionPolicy() async throws { + try await self.client.withTransaction(logger: logger) { tx in + let now = Date.now.timeIntervalSince1970 + let retentionPolicy: RetentionPolicy = configuration.retentionPolicy + + if case let .retain(timeAmout) = retentionPolicy.cancelled.rawValue { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.cancelled) + AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) + """, + logger: self.logger + ) + } + + if case let .retain(timeAmout) = retentionPolicy.completed.rawValue { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.completed) + AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) + """, + logger: self.logger + ) + } + + if case let .retain(timeAmout) = retentionPolicy.failed.rawValue { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.failed) + AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) + """, + logger: self.logger + ) + } + } + } + + func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { try await connection.query( """ UPDATE swift_jobs.jobs SET job = \(buffer), last_modified = \(Date.now), status = \(Status.failed) - WHERE id = \(id) AND queue_name = \(configuration.queueName) + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) """, logger: self.logger ) diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift new file mode 100644 index 0000000..2986d33 --- /dev/null +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -0,0 +1,67 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation + +/// Data rentension policy +public struct RetentionPolicy: Sendable { + + /// Data retention policy + public struct RetainData: Equatable, Sendable { + enum Policy { + case retain(for: TimeInterval) + case doNotRetain + } + + let rawValue: Policy + /// Retain policy + /// default to 7 days + public static func retain(for timeAmout: TimeInterval = 60 * 60 * 24 * 7) -> RetainData { + RetainData(rawValue: .retain(for: timeAmout)) + } + /// Never retain any data + public static let never: RetainData = RetainData(rawValue: .doNotRetain) + + public static func == (lhs: RetentionPolicy.RetainData, rhs: RetentionPolicy.RetainData) -> Bool { + switch (lhs.rawValue, rhs.rawValue) { + case (.retain(let lhsTimeAmout), .retain(let rhsTimeAmout)): + return lhsTimeAmout == rhsTimeAmout + case (.doNotRetain, .doNotRetain): + return true + default: + return false + } + } + } + + /// Jobs with status cancelled + /// default retention is set for 7 days + public var cancelled: RetainData + /// Jobs with status completed + /// default retention is set to 7 days + public var completed: RetainData + /// Jobs with status failed + /// default retention is set to 30 days + public var failed: RetainData + + public init( + cancelled: RetainData = .retain(), + completed: RetainData = .retain(), + failed: RetainData = .retain(for: 60 * 60 * 24 * 30) + ) { + self.cancelled = cancelled + self.completed = completed + self.failed = failed + } +} diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index f768d8d..47db781 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -178,10 +178,15 @@ final class JobsTests: XCTestCase { processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove, pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove, revertMigrations: Bool = true, + configuration: PostgresJobQueue.Configuration = .init(), function: String = #function, test: (JobQueue) async throws -> T ) async throws -> T { - let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function) + let jobQueue = try await self.createJobQueue( + numWorkers: numWorkers, + configuration: configuration, + function: function + ) return try await self.testJobQueue( jobQueue: jobQueue, failedJobsInitialization: failedJobsInitialization, @@ -490,7 +495,16 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - try await self.testJobQueue(numWorkers: 4) { jobQueue in + try await self.testJobQueue( + numWorkers: 4, + configuration: .init( + retentionPolicy: .init( + cancelled: .never, + completed: .never, + failed: .never + ) + ) + ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { _, _ in expectation.fulfill() try await Task.sleep(for: .milliseconds(1000)) @@ -769,7 +783,17 @@ final class JobsTests: XCTestCase { let didRunCancelledJob: NIOLockedValueBox = .init(false) let didRunNoneCancelledJob: NIOLockedValueBox = .init(false) - let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function) + let jobQueue = try await self.createJobQueue( + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .never, + completed: .never, + failed: .never + ) + ), + function: #function + ) try await testPriorityJobQueue(jobQueue: jobQueue) { queue in queue.registerJob(parameters: TestParameters.self) { parameters, context in @@ -827,4 +851,39 @@ final class JobsTests: XCTestCase { XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false) XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true) } + + func testJobRetention() async throws { + struct TestParameters: JobParameters { + static let jobName = "testJobRetention" + let value: Int + } + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) + try await self.testJobQueue( + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .retain(for: -1), + completed: .retain(for: -1), + failed: .retain(for: -1) + ) + ) + ) { jobQueue in + jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in + context.logger.info("Parameters=\(parameters.value)") + expectation.fulfill() + } + try await jobQueue.push(TestParameters(value: 1)) + try await jobQueue.push(TestParameters(value: 2)) + try await jobQueue.push(TestParameters(value: 3)) + + await fulfillment(of: [expectation], timeout: 10) + try await Task.sleep(for: .milliseconds(200)) + + let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) + XCTAssertEqual(completedJobs.count, 3) + try await jobQueue.queue.processDataRetentionPolicy() + let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .completed) + XCTAssertEqual(zeroJobs.count, 0) + } + } }