From 09fa92ceed16bc5737adf5b600bcc4f73e36c045 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Sun, 16 Mar 2025 16:55:12 -0400 Subject: [PATCH 01/21] Job retention --- Sources/JobsPostgres/PostgresJobsQueue.swift | 10 +++++- Sources/JobsPostgres/RetentionPolicy.swift | 36 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 Sources/JobsPostgres/RetentionPolicy.swift diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 6dccfc7..9a7421b 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -132,6 +132,8 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma let pollTime: Duration /// Which Queue to push jobs into let queueName: String + /// Retention policy for jobs + let retentionPolicy: RetentionPolicy /// Initialize configuration /// - Parameters @@ -139,10 +141,16 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// - queueName: Name of queue we are handing public init( pollTime: Duration = .milliseconds(100), - queueName: String = "default" + queueName: String = "default", + retentionPolicy: RetentionPolicy = .init( + canceled: .init(duration: "7D"), + completed: .init(duration: "7D"), + failed: .init(duration: "7D") + ) //.keepAll() ) { self.pollTime = pollTime self.queueName = queueName + self.retentionPolicy = retentionPolicy } } diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift new file mode 100644 index 0000000..484a03f --- /dev/null +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -0,0 +1,36 @@ +// +// RetentionPolicy.swift +// swift-jobs-postgres +// +// Created by Stevenson Michel on 3/15/25. +// + +public struct RetentionPolicy: Sendable { + + public struct RetainData: Sendable { + /// Duration in ISO 8601 format (e.g., "P30D" for 30 days) + public var duration: String + + public init(duration: String) { + self.duration = duration + } + } + +// public var days: Int +// +// public init(days: Int) { +// self.days = days +// } + /// Jobs with status cancelled + public var canceled: RetainData + /// Jobs with status completed + public var completed: RetainData + /// Jobs with status failed + public var failed: RetainData + + public init(canceled: RetainData, completed: RetainData, failed: RetainData) { + self.canceled = canceled + self.completed = completed + self.failed = failed + } +} From 15ae76006e52bd7d97fc96bf6f68737c2320e3b1 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 12:27:03 -0400 Subject: [PATCH 02/21] Job retention --- Sources/JobsPostgres/PostgresJobsQueue.swift | 74 ++++++++++++++++---- Sources/JobsPostgres/RetentionPolicy.swift | 39 ++++++----- Tests/JobsPostgresTests/JobsTests.swift | 56 ++++++++++++++- 3 files changed, 136 insertions(+), 33 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 9a7421b..d048fa4 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -124,6 +124,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma case failed = 2 case cancelled = 3 case paused = 4 + case completed = 5 } /// Queue configuration @@ -143,10 +144,10 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma pollTime: Duration = .milliseconds(100), queueName: String = "default", retentionPolicy: RetentionPolicy = .init( - canceled: .init(duration: "7D"), - completed: .init(duration: "7D"), - failed: .init(duration: "7D") - ) //.keepAll() + cancelled: .init(duration: 7 * 24 * 60), + completed: .init(duration: 7 * 24 * 60), + failed: .init(duration: 7 * 24 * 60) + ) ) { self.pollTime = pollTime self.queueName = queueName @@ -192,8 +193,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public func cancel(jobID: JobID) async throws { try await self.client.withTransaction(logger: logger) { connection in try await deleteFromQueue(jobID: jobID, connection: connection) - try await delete(jobID: jobID) + try await setStatus(jobID: jobID, status: .cancelled, connection: connection) } + try await processDataRetentionPolicy() } /// Pause job @@ -298,15 +300,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// Retry an existing Job /// - Parameters - /// - id: Job instance ID + /// - jobID: Job instance ID /// - jobRequest: Job Request /// - options: Job retry options - public func retry(_ id: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { + public func retry(_ jobID: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in - try await self.updateJob(id: id, buffer: buffer, connection: connection) + try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection) try await self.addToQueue( - jobID: id, + jobID: jobID, queueName: configuration.queueName, options: .init(delayUntil: options.delayUntil), connection: connection @@ -316,7 +318,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// This is called to say job has finished processing and it can be deleted public func finished(jobID: JobID) async throws { - try await self.delete(jobID: jobID) + //try await self.delete(jobID: jobID) + try await self.setStatus(jobID: jobID, status: .completed) + try await processDataRetentionPolicy() } /// This is called to say job has failed to run and should be put aside @@ -459,15 +463,59 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma logger: self.logger ) } - // TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged? - func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { + + func processDataRetentionPolicy() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + try await self.client.withTransaction(logger: logger) { tx in + let now = Date.now.timeIntervalSince1970 + let retentionPolicy = configuration.retentionPolicy + /// process // cancelled events + group.addTask { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.cancelled) + AND extract(epoch FROM last_modified)::int + \(retentionPolicy.cancelled.duration) < \(now) + """, + logger: self.logger + ) + } + + /// process failed events clean up + group.addTask { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.failed) + AND extract(epoch FROM last_modified)::int + \(retentionPolicy.failed.duration) < \(now) + """, + logger: self.logger + ) + } + + /// process completed events + group.addTask { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.completed) + AND extract(epoch FROM last_modified)::int + \(retentionPolicy.completed.duration) < \(now) + """, + logger: self.logger + ) + } + } + } + } + + func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { try await connection.query( """ UPDATE swift_jobs.jobs SET job = \(buffer), last_modified = \(Date.now), status = \(Status.failed) - WHERE id = \(id) AND queue_name = \(configuration.queueName) + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) """, logger: self.logger ) diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index 484a03f..2548997 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -1,35 +1,40 @@ +//===----------------------------------------------------------------------===// // -// RetentionPolicy.swift -// swift-jobs-postgres +// This source file is part of the Hummingbird server framework project // -// Created by Stevenson Michel on 3/15/25. +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 // +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Foundation + +/// Data rentension policy public struct RetentionPolicy: Sendable { - + /// Data retention public struct RetainData: Sendable { - /// Duration in ISO 8601 format (e.g., "P30D" for 30 days) - public var duration: String - - public init(duration: String) { + /// Duration + public var duration: TimeInterval + + public init(duration: TimeInterval) { self.duration = duration } } - -// public var days: Int -// -// public init(days: Int) { -// self.days = days -// } + /// Jobs with status cancelled - public var canceled: RetainData + public var cancelled: RetainData /// Jobs with status completed public var completed: RetainData /// Jobs with status failed public var failed: RetainData - public init(canceled: RetainData, completed: RetainData, failed: RetainData) { - self.canceled = canceled + public init(cancelled: RetainData, completed: RetainData, failed: RetainData) { + self.cancelled = cancelled self.completed = completed self.failed = failed } diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index f768d8d..59162b1 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -178,10 +178,15 @@ final class JobsTests: XCTestCase { processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove, pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove, revertMigrations: Bool = true, + configuration: PostgresJobQueue.Configuration = .init(), function: String = #function, test: (JobQueue) async throws -> T ) async throws -> T { - let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function) + let jobQueue = try await self.createJobQueue( + numWorkers: numWorkers, + configuration: configuration, + function: function + ) return try await self.testJobQueue( jobQueue: jobQueue, failedJobsInitialization: failedJobsInitialization, @@ -490,7 +495,16 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - try await self.testJobQueue(numWorkers: 4) { jobQueue in + try await self.testJobQueue( + numWorkers: 4, + configuration: .init( + retentionPolicy: .init( + cancelled: .init(duration: -1), + completed: .init(duration: -1), + failed: .init(duration: -1) + ) + ) + ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { _, _ in expectation.fulfill() try await Task.sleep(for: .milliseconds(1000)) @@ -769,7 +783,17 @@ final class JobsTests: XCTestCase { let didRunCancelledJob: NIOLockedValueBox = .init(false) let didRunNoneCancelledJob: NIOLockedValueBox = .init(false) - let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function) + let jobQueue = try await self.createJobQueue( + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .init(duration: -1), + completed: .init(duration: -1), + failed: .init(duration: -1) + ) + ), + function: #function + ) try await testPriorityJobQueue(jobQueue: jobQueue) { queue in queue.registerJob(parameters: TestParameters.self) { parameters, context in @@ -827,4 +851,30 @@ final class JobsTests: XCTestCase { XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false) XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true) } + + func testJobRetention() async throws { + struct TestParameters: JobParameters { + static let jobName = "testJobRetention" + let value: Int + } + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) + try await self.testJobQueue(numWorkers: 1) { jobQueue in + jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in + context.logger.info("Parameters=\(parameters.value)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() + } + let firstJob = try await jobQueue.push(TestParameters(value: 1)) + let secondJob = try await jobQueue.push(TestParameters(value: 2)) + let thirdJob = try await jobQueue.push(TestParameters(value: 3)) + + await fulfillment(of: [expectation], timeout: 5) + + let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) + XCTAssertEqual(completedJobs.count, 3) + try await jobQueue.queue.delete(jobID: firstJob) + try await jobQueue.queue.delete(jobID: secondJob) + try await jobQueue.queue.delete(jobID: thirdJob) + } + } } From 7f35e32d707e69f41b2550f4806461eccc775760 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 12:32:30 -0400 Subject: [PATCH 03/21] Swift format --- Sources/JobsPostgres/RetentionPolicy.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index 2548997..b872f5b 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -32,7 +32,7 @@ public struct RetentionPolicy: Sendable { public var completed: RetainData /// Jobs with status failed public var failed: RetainData - + public init(cancelled: RetainData, completed: RetainData, failed: RetainData) { self.cancelled = cancelled self.completed = completed From ca8f788a7bb2bbb01f6ddb9a494004acebee9299 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 22:48:03 -0400 Subject: [PATCH 04/21] Addressing PR comments --- Sources/JobsPostgres/PostgresJobsQueue.swift | 100 ++++++++++--------- Sources/JobsPostgres/RetentionPolicy.swift | 40 ++++++-- Tests/JobsPostgresTests/JobsTests.swift | 33 +++--- 3 files changed, 104 insertions(+), 69 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index d048fa4..a446137 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -143,11 +143,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public init( pollTime: Duration = .milliseconds(100), queueName: String = "default", - retentionPolicy: RetentionPolicy = .init( - cancelled: .init(duration: 7 * 24 * 60), - completed: .init(duration: 7 * 24 * 60), - failed: .init(duration: 7 * 24 * 60) - ) + retentionPolicy: RetentionPolicy = .init() ) { self.pollTime = pollTime self.queueName = queueName @@ -193,9 +189,12 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public func cancel(jobID: JobID) async throws { try await self.client.withTransaction(logger: logger) { connection in try await deleteFromQueue(jobID: jobID, connection: connection) - try await setStatus(jobID: jobID, status: .cancelled, connection: connection) + if configuration.retentionPolicy.cancelled == .never { + try await delete(jobID: jobID) + } else { + try await setStatus(jobID: jobID, status: .cancelled, connection: connection) + } } - try await processDataRetentionPolicy() } /// Pause job @@ -318,14 +317,20 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// This is called to say job has finished processing and it can be deleted public func finished(jobID: JobID) async throws { - //try await self.delete(jobID: jobID) - try await self.setStatus(jobID: jobID, status: .completed) - try await processDataRetentionPolicy() + if configuration.retentionPolicy.completed == .never { + try await self.delete(jobID: jobID) + } else { + try await self.setStatus(jobID: jobID, status: .completed) + } } /// This is called to say job has failed to run and should be put aside public func failed(jobID: JobID, error: Error) async throws { - try await self.setStatus(jobID: jobID, status: .failed) + if configuration.retentionPolicy.failed == .never { + try await self.delete(jobID: jobID) + } else { + try await self.setStatus(jobID: jobID, status: .failed) + } } /// stop serving jobs @@ -463,47 +468,44 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma logger: self.logger ) } + /// Helper func which to be use by a scheduled jobs + /// for performing job clean up based on a given set of policies + public func processDataRetentionPolicies() async throws { + try await self.client.withTransaction(logger: logger) { tx in + let now = Date.now.timeIntervalSince1970 + let retentionPolicy: RetentionPolicy = configuration.retentionPolicy - func processDataRetentionPolicy() async throws { - try await withThrowingTaskGroup(of: Void.self) { group in - try await self.client.withTransaction(logger: logger) { tx in - let now = Date.now.timeIntervalSince1970 - let retentionPolicy = configuration.retentionPolicy - /// process // cancelled events - group.addTask { - try await tx.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(Status.cancelled) - AND extract(epoch FROM last_modified)::int + \(retentionPolicy.cancelled.duration) < \(now) - """, - logger: self.logger - ) - } + if case let .retain(timeAmout) = retentionPolicy.cancelled.rawValue { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.cancelled) + AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) + """, + logger: self.logger + ) + } - /// process failed events clean up - group.addTask { - try await tx.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(Status.failed) - AND extract(epoch FROM last_modified)::int + \(retentionPolicy.failed.duration) < \(now) - """, - logger: self.logger - ) - } + if case let .retain(timeAmout) = retentionPolicy.completed.rawValue { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.completed) + AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) + """, + logger: self.logger + ) + } - /// process completed events - group.addTask { - try await tx.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(Status.completed) - AND extract(epoch FROM last_modified)::int + \(retentionPolicy.completed.duration) < \(now) - """, - logger: self.logger - ) - } + if case let .retain(timeAmout) = retentionPolicy.failed.rawValue { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.failed) + AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) + """, + logger: self.logger + ) } } } diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index b872f5b..2986d33 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -16,24 +16,50 @@ import Foundation /// Data rentension policy public struct RetentionPolicy: Sendable { - /// Data retention - public struct RetainData: Sendable { - /// Duration - public var duration: TimeInterval - public init(duration: TimeInterval) { - self.duration = duration + /// Data retention policy + public struct RetainData: Equatable, Sendable { + enum Policy { + case retain(for: TimeInterval) + case doNotRetain + } + + let rawValue: Policy + /// Retain policy + /// default to 7 days + public static func retain(for timeAmout: TimeInterval = 60 * 60 * 24 * 7) -> RetainData { + RetainData(rawValue: .retain(for: timeAmout)) + } + /// Never retain any data + public static let never: RetainData = RetainData(rawValue: .doNotRetain) + + public static func == (lhs: RetentionPolicy.RetainData, rhs: RetentionPolicy.RetainData) -> Bool { + switch (lhs.rawValue, rhs.rawValue) { + case (.retain(let lhsTimeAmout), .retain(let rhsTimeAmout)): + return lhsTimeAmout == rhsTimeAmout + case (.doNotRetain, .doNotRetain): + return true + default: + return false + } } } /// Jobs with status cancelled + /// default retention is set for 7 days public var cancelled: RetainData /// Jobs with status completed + /// default retention is set to 7 days public var completed: RetainData /// Jobs with status failed + /// default retention is set to 30 days public var failed: RetainData - public init(cancelled: RetainData, completed: RetainData, failed: RetainData) { + public init( + cancelled: RetainData = .retain(), + completed: RetainData = .retain(), + failed: RetainData = .retain(for: 60 * 60 * 24 * 30) + ) { self.cancelled = cancelled self.completed = completed self.failed = failed diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 59162b1..fccfec9 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -499,9 +499,9 @@ final class JobsTests: XCTestCase { numWorkers: 4, configuration: .init( retentionPolicy: .init( - cancelled: .init(duration: -1), - completed: .init(duration: -1), - failed: .init(duration: -1) + cancelled: .never, + completed: .never, + failed: .never ) ) ) { jobQueue in @@ -787,9 +787,9 @@ final class JobsTests: XCTestCase { numWorkers: 1, configuration: .init( retentionPolicy: .init( - cancelled: .init(duration: -1), - completed: .init(duration: -1), - failed: .init(duration: -1) + cancelled: .never, + completed: .never, + failed: .never ) ), function: #function @@ -858,23 +858,30 @@ final class JobsTests: XCTestCase { let value: Int } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) - try await self.testJobQueue(numWorkers: 1) { jobQueue in + try await self.testJobQueue( + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .retain(for: -1), + completed: .retain(for: -1), + failed: .retain(for: -1) + ) + ) + ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in context.logger.info("Parameters=\(parameters.value)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() } - let firstJob = try await jobQueue.push(TestParameters(value: 1)) - let secondJob = try await jobQueue.push(TestParameters(value: 2)) - let thirdJob = try await jobQueue.push(TestParameters(value: 3)) + try await jobQueue.push(TestParameters(value: 1)) + try await jobQueue.push(TestParameters(value: 2)) + try await jobQueue.push(TestParameters(value: 3)) await fulfillment(of: [expectation], timeout: 5) let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3) - try await jobQueue.queue.delete(jobID: firstJob) - try await jobQueue.queue.delete(jobID: secondJob) - try await jobQueue.queue.delete(jobID: thirdJob) + try await jobQueue.queue.processDataRetentionPolicy() } } } From c63bac85c1325807f4db4b9e9015950ae59d1045 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 22:53:37 -0400 Subject: [PATCH 05/21] Update PostgresJobsQueue.swift --- Sources/JobsPostgres/PostgresJobsQueue.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index a446137..3359836 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -470,7 +470,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma } /// Helper func which to be use by a scheduled jobs /// for performing job clean up based on a given set of policies - public func processDataRetentionPolicies() async throws { + public func processDataRetentionPolicy() async throws { try await self.client.withTransaction(logger: logger) { tx in let now = Date.now.timeIntervalSince1970 let retentionPolicy: RetentionPolicy = configuration.retentionPolicy From d12158bd39f6593120d7a4c36ec1fc72b28b8298 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 23:01:31 -0400 Subject: [PATCH 06/21] fixing the failing test --- Tests/JobsPostgresTests/JobsTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index fccfec9..fe3fb53 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -877,7 +877,7 @@ final class JobsTests: XCTestCase { try await jobQueue.push(TestParameters(value: 2)) try await jobQueue.push(TestParameters(value: 3)) - await fulfillment(of: [expectation], timeout: 5) + await fulfillment(of: [expectation], timeout: 10) let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3) From e9655b2b90170e040f37330e80a6872e9ad66351 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 19:59:05 -0400 Subject: [PATCH 07/21] Adding job pruner and instruction on how to register and run the job on a schedule --- Sources/JobsPostgres/JobPruner.swift | 19 +++++++++++++++++++ Sources/JobsPostgres/PostgresJobsQueue.swift | 8 ++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 Sources/JobsPostgres/JobPruner.swift diff --git a/Sources/JobsPostgres/JobPruner.swift b/Sources/JobsPostgres/JobPruner.swift new file mode 100644 index 0000000..ced8663 --- /dev/null +++ b/Sources/JobsPostgres/JobPruner.swift @@ -0,0 +1,19 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Jobs + +public struct JobPruner: JobParameters { + static public var jobName: String { "JobPruner" } +} diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 3359836..fb42eee 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -468,8 +468,12 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma logger: self.logger ) } - /// Helper func which to be use by a scheduled jobs - /// for performing job clean up based on a given set of policies + + /// This menthod shall be called with the JobPruner after registration as follow + /// someJobQueue.registerJobparameters: JobPruner.self) { _, _ in + /// try await someJobQueue.processDataRetentionPolicy() + /// } + /// let jobScheddule = JobSchedule([ .init(job: JobPruner(), schedule: .everyHour()) ]) public func processDataRetentionPolicy() async throws { try await self.client.withTransaction(logger: logger) { tx in let now = Date.now.timeIntervalSince1970 From e90ebd8deb1af2a69217f8eff5fd714afa37ba93 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 20:14:17 -0400 Subject: [PATCH 08/21] increase time amount for test job --- Tests/JobsPostgresTests/JobsTests.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index fe3fb53..08f7516 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -862,9 +862,9 @@ final class JobsTests: XCTestCase { numWorkers: 1, configuration: .init( retentionPolicy: .init( - cancelled: .retain(for: -1), - completed: .retain(for: -1), - failed: .retain(for: -1) + cancelled: .retain(for: 1), + completed: .retain(for: 1), + failed: .retain(for: 1) ) ) ) { jobQueue in From 0a1607d963a42257a2bf6506a1fdceeeeefe1c72 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 20:22:52 -0400 Subject: [PATCH 09/21] Update JobsTests.swift --- Tests/JobsPostgresTests/JobsTests.swift | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 08f7516..c9891ca 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -859,14 +859,7 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) try await self.testJobQueue( - numWorkers: 1, - configuration: .init( - retentionPolicy: .init( - cancelled: .retain(for: 1), - completed: .retain(for: 1), - failed: .retain(for: 1) - ) - ) + numWorkers: 1 ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in context.logger.info("Parameters=\(parameters.value)") From 67ecbf15e46cd579852940ef2a58b75eb8e79b41 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 20:39:39 -0400 Subject: [PATCH 10/21] Update JobsTests.swift --- Tests/JobsPostgresTests/JobsTests.swift | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index c9891ca..f816c09 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -859,11 +859,17 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) try await self.testJobQueue( - numWorkers: 1 + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .retain(for: -1), + completed: .retain(for: -1), + failed: .retain(for: -1) + ) + ) ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in context.logger.info("Parameters=\(parameters.value)") - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() } try await jobQueue.push(TestParameters(value: 1)) @@ -875,6 +881,8 @@ final class JobsTests: XCTestCase { let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3) try await jobQueue.queue.processDataRetentionPolicy() + let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .completed) + XCTAssertEqual(zeroJobs.count, 0) } } } From 098d1ebc09d84ecaef0c1b8ba78c654b0ee3822c Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 20:51:20 -0400 Subject: [PATCH 11/21] up concurrency to 3 to match number of job. There seems to be a timing issue --- Tests/JobsPostgresTests/JobsTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index f816c09..b459a3b 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -859,7 +859,7 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) try await self.testJobQueue( - numWorkers: 1, + numWorkers: 3, configuration: .init( retentionPolicy: .init( cancelled: .retain(for: -1), From de0754e9801cb3e24f9794d011dbfe1650ca3275 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 21:00:13 -0400 Subject: [PATCH 12/21] Wait for 200 milliseconds before checking the job count. --- Tests/JobsPostgresTests/JobsTests.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index b459a3b..47db781 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -859,7 +859,7 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) try await self.testJobQueue( - numWorkers: 3, + numWorkers: 1, configuration: .init( retentionPolicy: .init( cancelled: .retain(for: -1), @@ -877,6 +877,7 @@ final class JobsTests: XCTestCase { try await jobQueue.push(TestParameters(value: 3)) await fulfillment(of: [expectation], timeout: 10) + try await Task.sleep(for: .milliseconds(200)) let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3) From 5c22e68c1bf141bb8a5a2d9ce61b106e1c678815 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 15 May 2025 11:32:27 +0100 Subject: [PATCH 13/21] Merge retention policy and cleanup --- .../JobsPostgres/JobCleanupParameters.swift | 36 +++++ Sources/JobsPostgres/JobPruner.swift | 19 --- .../PostgresJobsQueue+cleanup.swift | 105 +++++++++++++ Sources/JobsPostgres/PostgresJobsQueue.swift | 143 +++--------------- Sources/JobsPostgres/RetentionPolicy.swift | 29 +--- Tests/JobsPostgresTests/JobsTests.swift | 8 +- 6 files changed, 176 insertions(+), 164 deletions(-) create mode 100644 Sources/JobsPostgres/JobCleanupParameters.swift delete mode 100644 Sources/JobsPostgres/JobPruner.swift create mode 100644 Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift diff --git a/Sources/JobsPostgres/JobCleanupParameters.swift b/Sources/JobsPostgres/JobCleanupParameters.swift new file mode 100644 index 0000000..2ef61dc --- /dev/null +++ b/Sources/JobsPostgres/JobCleanupParameters.swift @@ -0,0 +1,36 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Jobs + +public struct JobCleanupParameters: JobParameters { + static public var jobName: String { "_JobCleanup_" } + + let jobQueueName: String + let failedJobs: PostgresJobQueue.JobCleanup + let completedJobs: PostgresJobQueue.JobCleanup + let cancelledJobs: PostgresJobQueue.JobCleanup + + public init( + jobQueueName: String, + failedJobs: PostgresJobQueue.JobCleanup = .doNothing, + completedJobs: PostgresJobQueue.JobCleanup = .doNothing, + cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing + ) { + self.jobQueueName = jobQueueName + self.failedJobs = failedJobs + self.completedJobs = completedJobs + self.cancelledJobs = cancelledJobs + } +} diff --git a/Sources/JobsPostgres/JobPruner.swift b/Sources/JobsPostgres/JobPruner.swift deleted file mode 100644 index ced8663..0000000 --- a/Sources/JobsPostgres/JobPruner.swift +++ /dev/null @@ -1,19 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2025 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Jobs - -public struct JobPruner: JobParameters { - static public var jobName: String { "JobPruner" } -} diff --git a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift new file mode 100644 index 0000000..52e5c7c --- /dev/null +++ b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift @@ -0,0 +1,105 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024-2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Jobs +import Logging +import NIOConcurrencyHelpers +import NIOCore +import PostgresMigrations +import PostgresNIO + +extension PostgresJobQueue { + /// Cleanup job queues + /// + /// This function is used to re-run or delete jobs in a certain state. Failed jobs can be + /// pushed back into the pending queue to be re-run or removed. When called at startup in + /// theory no job should be set to processing, or set to pending but not in the queue. but if + /// your job server crashes these states are possible, so we also provide options to re-queue + /// these jobs so they are run again. + /// + /// The job queue needs to be running when you call cleanup. You can call `cleanup` with + /// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing + /// or pending jobs should only be done if you are certain there is nothing else processing + /// the job queue. + /// + /// - Parameters: + /// - failedJobs: What to do with jobs tagged as failed + /// - processingJobs: What to do with jobs tagged as processing + /// - pendingJobs: What to do with jobs tagged as pending + /// - completedJobs: What to do with jobs tagged as completed + /// - cancelledJobs: What to do with jobs tagged as cancelled + /// - logger: Optional logger to use when performing cleanup + /// - Throws: + public func cleanup( + failedJobs: JobCleanup = .doNothing, + processingJobs: JobCleanup = .doNothing, + pendingJobs: JobCleanup = .doNothing, + completedJobs: JobCleanup = .doNothing, + cancelledJobs: JobCleanup = .doNothing, + logger: Logger? = nil + ) async throws { + let logger = logger ?? self.logger + do { + /// wait for migrations to complete before running job queue cleanup + try await self.migrations.waitUntilCompleted() + _ = try await self.client.withTransaction(logger: logger) { connection in + self.logger.info("Update Jobs") + try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection) + try await self.updateJobsOnInit(withStatus: .processing, onInit: processingJobs, connection: connection) + try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection) + try await self.updateJobsOnInit(withStatus: .completed, onInit: completedJobs, connection: connection) + try await self.updateJobsOnInit(withStatus: .cancelled, onInit: cancelledJobs, connection: connection) + } + } catch let error as PSQLError { + logger.error( + "JobQueue cleanup failed", + metadata: [ + "Error": "\(String(reflecting: error))" + ] + ) + throw error + } + } + + func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws { + switch onInit.rawValue { + case .remove(let olderThan): + let date: Date = + if let olderThan { + .now - Double(olderThan.components.seconds) + } else { + .distantPast + } + try await connection.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(status) AND queue_name = \(configuration.queueName) + AND last_modified < \(date) + """, + logger: self.logger + ) + + case .rerun: + let jobs = try await getJobs(withStatus: status) + self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") + for jobID in jobs { + try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection) + } + + case .doNothing: + break + } + } +} diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index fb42eee..ae5c97b 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -46,10 +46,18 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public typealias JobID = UUID /// what to do with failed/processing jobs from last time queue was handled - public enum JobCleanup: Sendable { - case doNothing - case rerun - case remove + public struct JobCleanup: Sendable, Codable { + enum RawValue: Codable { + case doNothing + case rerun + case remove(maxAge: Duration?) + } + let rawValue: RawValue + + public static var doNothing: Self { .init(rawValue: .doNothing) } + public static var rerun: Self { .init(rawValue: .rerun) } + public static var remove: Self { .init(rawValue: .remove(maxAge: nil)) } + public static func remove(maxAge: Duration) -> Self { .init(rawValue: .remove(maxAge: maxAge)) } } /// Job priority from lowest to highest @@ -170,6 +178,18 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma self.isStopped = .init(false) self.migrations = migrations await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true) + self.registerJob( + JobDefinition(parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in + parameters.jobQueue.cleanup( + failedJobs: parameters.failedJobs, + processingJobs: .doNothing, + pendingJobs: .doNothing, + completedJobs: parameters.completedJobs, + cancelledJobs: parameters.cancelledJobs, + logger: logger + ) + } + ) } public func onInit() async throws { @@ -232,53 +252,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma } } - /// Cleanup job queues - /// - /// This function is used to re-run or delete jobs in a certain state. Failed jobs can be - /// pushed back into the pending queue to be re-run or removed. When called at startup in - /// theory no job should be set to processing, or set to pending but not in the queue. but if - /// your job server crashes these states are possible, so we also provide options to re-queue - /// these jobs so they are run again. - /// - /// The job queue needs to be running when you call cleanup. You can call `cleanup` with - /// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing - /// or pending jobs should only be done if you are certain there is nothing else processing - /// the job queue. - /// - /// - Parameters: - /// - failedJobs: What to do with jobs tagged as failed - /// - processingJobs: What to do with jobs tagged as processing - /// - pendingJobs: What to do with jobs tagged as pending - /// - Throws: - public func cleanup( - failedJobs: JobCleanup = .doNothing, - processingJobs: JobCleanup = .doNothing, - pendingJobs: JobCleanup = .doNothing - ) async throws { - do { - /// wait for migrations to complete before running job queue cleanup - try await self.migrations.waitUntilCompleted() - _ = try await self.client.withConnection { connection in - self.logger.info("Update Jobs") - try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection) - try await self.updateJobsOnInit( - withStatus: .processing, - onInit: processingJobs, - connection: connection - ) - try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection) - } - } catch let error as PSQLError { - logger.error( - "JobQueue initialization failed", - metadata: [ - "Error": "\(String(reflecting: error))" - ] - ) - throw error - } - } - /// Register job /// - Parameters: /// - job: Job Definition @@ -469,51 +442,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma ) } - /// This menthod shall be called with the JobPruner after registration as follow - /// someJobQueue.registerJobparameters: JobPruner.self) { _, _ in - /// try await someJobQueue.processDataRetentionPolicy() - /// } - /// let jobScheddule = JobSchedule([ .init(job: JobPruner(), schedule: .everyHour()) ]) - public func processDataRetentionPolicy() async throws { - try await self.client.withTransaction(logger: logger) { tx in - let now = Date.now.timeIntervalSince1970 - let retentionPolicy: RetentionPolicy = configuration.retentionPolicy - - if case let .retain(timeAmout) = retentionPolicy.cancelled.rawValue { - try await tx.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(Status.cancelled) - AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) - """, - logger: self.logger - ) - } - - if case let .retain(timeAmout) = retentionPolicy.completed.rawValue { - try await tx.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(Status.completed) - AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) - """, - logger: self.logger - ) - } - - if case let .retain(timeAmout) = retentionPolicy.failed.rawValue { - try await tx.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(Status.failed) - AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) - """, - logger: self.logger - ) - } - } - } - func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { try await connection.query( """ @@ -600,29 +528,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma return jobs } - func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws { - switch onInit { - case .remove: - try await connection.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(status) AND queue_name = \(configuration.queueName) - """, - logger: self.logger - ) - - case .rerun: - let jobs = try await getJobs(withStatus: status) - self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") - for jobID in jobs { - try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection) - } - - case .doNothing: - break - } - } - let jobRegistry: JobRegistry } diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index 2986d33..3694534 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -16,33 +16,18 @@ import Foundation /// Data rentension policy public struct RetentionPolicy: Sendable { - /// Data retention policy public struct RetainData: Equatable, Sendable { enum Policy { - case retain(for: TimeInterval) + case retain case doNotRetain } let rawValue: Policy - /// Retain policy - /// default to 7 days - public static func retain(for timeAmout: TimeInterval = 60 * 60 * 24 * 7) -> RetainData { - RetainData(rawValue: .retain(for: timeAmout)) - } + /// Retain task + public static var retain: RetainData { RetainData(rawValue: .retain) } /// Never retain any data - public static let never: RetainData = RetainData(rawValue: .doNotRetain) - - public static func == (lhs: RetentionPolicy.RetainData, rhs: RetentionPolicy.RetainData) -> Bool { - switch (lhs.rawValue, rhs.rawValue) { - case (.retain(let lhsTimeAmout), .retain(let rhsTimeAmout)): - return lhsTimeAmout == rhsTimeAmout - case (.doNotRetain, .doNotRetain): - return true - default: - return false - } - } + public static var never: RetainData { RetainData(rawValue: .doNotRetain) } } /// Jobs with status cancelled @@ -56,9 +41,9 @@ public struct RetentionPolicy: Sendable { public var failed: RetainData public init( - cancelled: RetainData = .retain(), - completed: RetainData = .retain(), - failed: RetainData = .retain(for: 60 * 60 * 24 * 30) + cancelled: RetainData = .retain, + completed: RetainData = .retain, + failed: RetainData = .retain ) { self.cancelled = cancelled self.completed = completed diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 47db781..3b9e902 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -862,9 +862,9 @@ final class JobsTests: XCTestCase { numWorkers: 1, configuration: .init( retentionPolicy: .init( - cancelled: .retain(for: -1), - completed: .retain(for: -1), - failed: .retain(for: -1) + cancelled: .retain, + completed: .retain, + failed: .retain ) ) ) { jobQueue in @@ -881,7 +881,7 @@ final class JobsTests: XCTestCase { let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3) - try await jobQueue.queue.processDataRetentionPolicy() + try await jobQueue.queue.cleanup(completedJobs: .remove(maxAge: .seconds(0))) let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(zeroJobs.count, 0) } From f482e1973559095a759f307eab4795d5d3fab4da Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 16 May 2025 09:22:49 +0100 Subject: [PATCH 14/21] Register cleanup job This currently assumes there is only one postgres job queue. Need the changes splitting job id from parameters to implement for every queue --- Sources/JobsPostgres/JobCleanupParameters.swift | 3 --- Sources/JobsPostgres/PostgresJobsQueue.swift | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/Sources/JobsPostgres/JobCleanupParameters.swift b/Sources/JobsPostgres/JobCleanupParameters.swift index 2ef61dc..5471af3 100644 --- a/Sources/JobsPostgres/JobCleanupParameters.swift +++ b/Sources/JobsPostgres/JobCleanupParameters.swift @@ -17,18 +17,15 @@ import Jobs public struct JobCleanupParameters: JobParameters { static public var jobName: String { "_JobCleanup_" } - let jobQueueName: String let failedJobs: PostgresJobQueue.JobCleanup let completedJobs: PostgresJobQueue.JobCleanup let cancelledJobs: PostgresJobQueue.JobCleanup public init( - jobQueueName: String, failedJobs: PostgresJobQueue.JobCleanup = .doNothing, completedJobs: PostgresJobQueue.JobCleanup = .doNothing, cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing ) { - self.jobQueueName = jobQueueName self.failedJobs = failedJobs self.completedJobs = completedJobs self.cancelledJobs = cancelledJobs diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index ae5c97b..48de86f 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -180,7 +180,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true) self.registerJob( JobDefinition(parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in - parameters.jobQueue.cleanup( + try await self.cleanup( failedJobs: parameters.failedJobs, processingJobs: .doNothing, pendingJobs: .doNothing, From b6497ef31046eb16cf4c864b384202c87f7aec91 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sat, 17 May 2025 17:32:47 +0100 Subject: [PATCH 15/21] Create separate cleanup job for each queue --- Package.swift | 2 +- .../JobsPostgres/JobCleanupParameters.swift | 33 ---------- .../PostgresJobsQueue+cleanup.swift | 43 ++++++++++++- Sources/JobsPostgres/PostgresJobsQueue.swift | 14 +---- Sources/JobsPostgres/RetentionPolicy.swift | 2 +- Tests/JobsPostgresTests/JobsTests.swift | 63 +++++++++++++++++++ 6 files changed, 108 insertions(+), 49 deletions(-) delete mode 100644 Sources/JobsPostgres/JobCleanupParameters.swift diff --git a/Package.swift b/Package.swift index 93f86d9..6979092 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( ], dependencies: [ // TODO: use a released version of swift-jobs - .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), + .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "jobschedule-addjob"), .package(url: "https://github.com/hummingbird-project/postgres-migrations.git", from: "0.1.0"), .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"), ], diff --git a/Sources/JobsPostgres/JobCleanupParameters.swift b/Sources/JobsPostgres/JobCleanupParameters.swift deleted file mode 100644 index 5471af3..0000000 --- a/Sources/JobsPostgres/JobCleanupParameters.swift +++ /dev/null @@ -1,33 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2025 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Jobs - -public struct JobCleanupParameters: JobParameters { - static public var jobName: String { "_JobCleanup_" } - - let failedJobs: PostgresJobQueue.JobCleanup - let completedJobs: PostgresJobQueue.JobCleanup - let cancelledJobs: PostgresJobQueue.JobCleanup - - public init( - failedJobs: PostgresJobQueue.JobCleanup = .doNothing, - completedJobs: PostgresJobQueue.JobCleanup = .doNothing, - cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing - ) { - self.failedJobs = failedJobs - self.completedJobs = completedJobs - self.cancelledJobs = cancelledJobs - } -} diff --git a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift index 52e5c7c..a6c9edf 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift @@ -20,7 +20,48 @@ import NIOCore import PostgresMigrations import PostgresNIO +/// Parameters for Cleanup job +public struct JobCleanupParameters: Sendable & Codable { + let failedJobs: PostgresJobQueue.JobCleanup + let completedJobs: PostgresJobQueue.JobCleanup + let cancelledJobs: PostgresJobQueue.JobCleanup + + public init( + failedJobs: PostgresJobQueue.JobCleanup = .doNothing, + completedJobs: PostgresJobQueue.JobCleanup = .doNothing, + cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing + ) { + self.failedJobs = failedJobs + self.completedJobs = completedJobs + self.cancelledJobs = cancelledJobs + } +} + extension PostgresJobQueue { + /// clean up job name. + /// + /// Use this with the ``JobSchedule`` to schedule a cleanup of + /// failed, cancelled or completed jobs + public var cleanupJob: JobName { + .init("_JobCleanup_\(self.configuration.queueName)") + } + + /// register clean up job on queue + func registerCleanupJob() { + self.registerJob( + JobDefinition(name: cleanupJob, parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in + try await self.cleanup( + failedJobs: parameters.failedJobs, + processingJobs: .doNothing, + pendingJobs: .doNothing, + completedJobs: parameters.completedJobs, + cancelledJobs: parameters.cancelledJobs, + logger: self.logger + ) + } + ) + } + /// Cleanup job queues /// /// This function is used to re-run or delete jobs in a certain state. Failed jobs can be @@ -80,7 +121,7 @@ extension PostgresJobQueue { if let olderThan { .now - Double(olderThan.components.seconds) } else { - .distantPast + .distantFuture } try await connection.query( """ diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 48de86f..ed3b10d 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -165,7 +165,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public let configuration: Configuration /// Logger used by queue public let logger: Logger - let migrations: DatabaseMigrations let isStopped: NIOLockedValueBox @@ -178,18 +177,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma self.isStopped = .init(false) self.migrations = migrations await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true) - self.registerJob( - JobDefinition(parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in - try await self.cleanup( - failedJobs: parameters.failedJobs, - processingJobs: .doNothing, - pendingJobs: .doNothing, - completedJobs: parameters.completedJobs, - cancelledJobs: parameters.cancelledJobs, - logger: logger - ) - } - ) + self.registerCleanupJob() } public func onInit() async throws { diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index 3694534..ddf3e45 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -42,7 +42,7 @@ public struct RetentionPolicy: Sendable { public init( cancelled: RetainData = .retain, - completed: RetainData = .retain, + completed: RetainData = .never, failed: RetainData = .retain ) { self.cancelled = cancelled diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 3b9e902..eca6c24 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -886,4 +886,67 @@ final class JobsTests: XCTestCase { XCTAssertEqual(zeroJobs.count, 0) } } + + func testCleanupJob() async throws { + try await self.testJobQueue( + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .retain, + completed: .never, + failed: .retain + ) + ) + ) { jobQueue in + try await self.testJobQueue( + numWorkers: 1, + configuration: .init( + queueName: "SecondQueue", + retentionPolicy: .init( + cancelled: .retain, + completed: .never, + failed: .retain + ) + ) + ) { jobQueue2 in + let (stream, cont) = AsyncStream.makeStream(of: Void.self) + var iterator = stream.makeAsyncIterator() + struct TempJob: Sendable & Codable {} + let barrierJobName = JobName("barrier") + jobQueue.registerJob(name: "testCleanupJob", parameters: String.self) { parameters, context in + throw CancellationError() + } + jobQueue.registerJob(name: barrierJobName, parameters: TempJob.self) { parameters, context in + cont.yield() + } + jobQueue2.registerJob(name: "testCleanupJob", parameters: String.self) { parameters, context in + throw CancellationError() + } + jobQueue2.registerJob(name: barrierJobName, parameters: TempJob.self) { parameters, context in + cont.yield() + } + try await jobQueue.push("testCleanupJob", parameters: "1") + try await jobQueue.push("testCleanupJob", parameters: "2") + try await jobQueue.push("testCleanupJob", parameters: "3") + try await jobQueue.push(barrierJobName, parameters: .init()) + try await jobQueue2.push("testCleanupJob", parameters: "1") + try await jobQueue2.push(barrierJobName, parameters: .init()) + + await iterator.next() + await iterator.next() + + let failedJob = try await jobQueue.queue.getJobs(withStatus: .failed) + XCTAssertEqual(failedJob.count, 3) + try await jobQueue.push(jobQueue.queue.cleanupJob, parameters: .init(failedJobs: .remove)) + try await jobQueue.push(barrierJobName, parameters: .init()) + + await iterator.next() + + let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .failed) + XCTAssertEqual(zeroJobs.count, 0) + let jobCount2 = try await jobQueue2.queue.getJobs(withStatus: .failed) + XCTAssertEqual(jobCount2.count, 1) + } + } + } } From f4bbb60f6049d3288806d0c6dce4f6c28164fe2b Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sat, 17 May 2025 17:33:30 +0100 Subject: [PATCH 16/21] Use swift-jobs main branch --- Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Package.swift b/Package.swift index 6979092..93f86d9 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( ], dependencies: [ // TODO: use a released version of swift-jobs - .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "jobschedule-addjob"), + .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), .package(url: "https://github.com/hummingbird-project/postgres-migrations.git", from: "0.1.0"), .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"), ], From 3c0c8a119a094fca6564c68a35094def091067d6 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sun, 18 May 2025 07:33:02 +0100 Subject: [PATCH 17/21] .doNotRetain --- Sources/JobsPostgres/PostgresJobsQueue.swift | 6 +++--- Sources/JobsPostgres/RetentionPolicy.swift | 9 +++------ Tests/JobsPostgresTests/JobsTests.swift | 16 ++++++++-------- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index ed3b10d..8724e93 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -197,7 +197,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public func cancel(jobID: JobID) async throws { try await self.client.withTransaction(logger: logger) { connection in try await deleteFromQueue(jobID: jobID, connection: connection) - if configuration.retentionPolicy.cancelled == .never { + if configuration.retentionPolicy.cancelled == .doNotRetain { try await delete(jobID: jobID) } else { try await setStatus(jobID: jobID, status: .cancelled, connection: connection) @@ -278,7 +278,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// This is called to say job has finished processing and it can be deleted public func finished(jobID: JobID) async throws { - if configuration.retentionPolicy.completed == .never { + if configuration.retentionPolicy.completed == .doNotRetain { try await self.delete(jobID: jobID) } else { try await self.setStatus(jobID: jobID, status: .completed) @@ -287,7 +287,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// This is called to say job has failed to run and should be put aside public func failed(jobID: JobID, error: Error) async throws { - if configuration.retentionPolicy.failed == .never { + if configuration.retentionPolicy.failed == .doNotRetain { try await self.delete(jobID: jobID) } else { try await self.setStatus(jobID: jobID, status: .failed) diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index ddf3e45..6b0d8a4 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -27,22 +27,19 @@ public struct RetentionPolicy: Sendable { /// Retain task public static var retain: RetainData { RetainData(rawValue: .retain) } /// Never retain any data - public static var never: RetainData { RetainData(rawValue: .doNotRetain) } + public static var doNotRetain: RetainData { RetainData(rawValue: .doNotRetain) } } /// Jobs with status cancelled - /// default retention is set for 7 days public var cancelled: RetainData /// Jobs with status completed - /// default retention is set to 7 days public var completed: RetainData /// Jobs with status failed - /// default retention is set to 30 days public var failed: RetainData public init( - cancelled: RetainData = .retain, - completed: RetainData = .never, + cancelled: RetainData = .doNotRetain, + completed: RetainData = .doNotRetain, failed: RetainData = .retain ) { self.cancelled = cancelled diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index eca6c24..19ce4f4 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -499,9 +499,9 @@ final class JobsTests: XCTestCase { numWorkers: 4, configuration: .init( retentionPolicy: .init( - cancelled: .never, - completed: .never, - failed: .never + cancelled: .doNotRetain, + completed: .doNotRetain, + failed: .doNotRetain ) ) ) { jobQueue in @@ -787,9 +787,9 @@ final class JobsTests: XCTestCase { numWorkers: 1, configuration: .init( retentionPolicy: .init( - cancelled: .never, - completed: .never, - failed: .never + cancelled: .doNotRetain, + completed: .doNotRetain, + failed: .doNotRetain ) ), function: #function @@ -893,7 +893,7 @@ final class JobsTests: XCTestCase { configuration: .init( retentionPolicy: .init( cancelled: .retain, - completed: .never, + completed: .doNotRetain, failed: .retain ) ) @@ -904,7 +904,7 @@ final class JobsTests: XCTestCase { queueName: "SecondQueue", retentionPolicy: .init( cancelled: .retain, - completed: .never, + completed: .doNotRetain, failed: .retain ) ) From b64f33475bfcb555278a4fc5803f62d9192d84f9 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 19 May 2025 08:47:10 +0100 Subject: [PATCH 18/21] Move JobCleanup between files --- .../JobsPostgres/PostgresJobsQueue+cleanup.swift | 15 +++++++++++++++ Sources/JobsPostgres/PostgresJobsQueue.swift | 14 -------------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift index a6c9edf..f5ca567 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift @@ -38,6 +38,21 @@ public struct JobCleanupParameters: Sendable & Codable { } extension PostgresJobQueue { + /// what to do with failed/processing jobs from last time queue was handled + public struct JobCleanup: Sendable, Codable { + enum RawValue: Codable { + case doNothing + case rerun + case remove(maxAge: Duration?) + } + let rawValue: RawValue + + public static var doNothing: Self { .init(rawValue: .doNothing) } + public static var rerun: Self { .init(rawValue: .rerun) } + public static var remove: Self { .init(rawValue: .remove(maxAge: nil)) } + public static func remove(maxAge: Duration) -> Self { .init(rawValue: .remove(maxAge: maxAge)) } + } + /// clean up job name. /// /// Use this with the ``JobSchedule`` to schedule a cleanup of diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 8724e93..2fca1cd 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -45,20 +45,6 @@ import PostgresNIO public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue { public typealias JobID = UUID - /// what to do with failed/processing jobs from last time queue was handled - public struct JobCleanup: Sendable, Codable { - enum RawValue: Codable { - case doNothing - case rerun - case remove(maxAge: Duration?) - } - let rawValue: RawValue - - public static var doNothing: Self { .init(rawValue: .doNothing) } - public static var rerun: Self { .init(rawValue: .rerun) } - public static var remove: Self { .init(rawValue: .remove(maxAge: nil)) } - public static func remove(maxAge: Duration) -> Self { .init(rawValue: .remove(maxAge: maxAge)) } - } /// Job priority from lowest to highest public struct JobPriority: Equatable, Sendable { From a98f6b5a461416d7eb29afd10d6866dbeb1ddc41 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 19 May 2025 09:24:46 +0100 Subject: [PATCH 19/21] Add testCancelledJobRetention --- Tests/JobsPostgresTests/JobsTests.swift | 57 ++++++++++++++++--------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 19ce4f4..97d0bdc 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -852,21 +852,15 @@ final class JobsTests: XCTestCase { XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true) } - func testJobRetention() async throws { + func testCompletedJobRetention() async throws { struct TestParameters: JobParameters { - static let jobName = "testJobRetention" + static let jobName = "testCompletedJobRetention" let value: Int } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) try await self.testJobQueue( numWorkers: 1, - configuration: .init( - retentionPolicy: .init( - cancelled: .retain, - completed: .retain, - failed: .retain - ) - ) + configuration: .init(retentionPolicy: .init(completed: .retain)) ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in context.logger.info("Parameters=\(parameters.value)") @@ -887,26 +881,47 @@ final class JobsTests: XCTestCase { } } + func testCancelledJobRetention() async throws { + let jobQueue = try await self.createJobQueue( + numWorkers: 1, + configuration: .init(retentionPolicy: .init(cancelled: .retain)) + ) + let jobName = JobName("testCancelledJobRetention") + jobQueue.registerJob(name: jobName) { _, _ in } + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + // run postgres client + await jobQueue.queue.client.run() + } + try await jobQueue.queue.migrations.apply(client: jobQueue.queue.client, logger: jobQueue.logger, dryRun: false) + + let jobId = try await jobQueue.push(jobName, parameters: 1) + let jobId2 = try await jobQueue.push(jobName, parameters: 2) + + try await jobQueue.cancelJob(jobID: jobId) + try await jobQueue.cancelJob(jobID: jobId2) + + var cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .cancelled) + XCTAssertEqual(cancelledJobs.count, 2) + try await jobQueue.queue.cleanup(cancelledJobs: .remove(maxAge: .seconds(0))) + cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .cancelled) + XCTAssertEqual(cancelledJobs.count, 0) + + group.cancelAll() + } + } + func testCleanupJob() async throws { try await self.testJobQueue( numWorkers: 1, - configuration: .init( - retentionPolicy: .init( - cancelled: .retain, - completed: .doNotRetain, - failed: .retain - ) - ) + configuration: .init(retentionPolicy: .init(failed: .retain)) ) { jobQueue in try await self.testJobQueue( numWorkers: 1, configuration: .init( queueName: "SecondQueue", - retentionPolicy: .init( - cancelled: .retain, - completed: .doNotRetain, - failed: .retain - ) + retentionPolicy: .init(failed: .retain) ) ) { jobQueue2 in let (stream, cont) = AsyncStream.makeStream(of: Void.self) From 7639421d692a216f767f693f5c585f5fb5c7e639 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 19 May 2025 15:26:45 +0100 Subject: [PATCH 20/21] Move Retention policy inside PostgresJobQueue --- .../PostgresJobsQueue+cleanup.swift | 2 +- Sources/JobsPostgres/RetentionPolicy.swift | 58 ++++++++++--------- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift index f5ca567..8963042 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift @@ -58,7 +58,7 @@ extension PostgresJobQueue { /// Use this with the ``JobSchedule`` to schedule a cleanup of /// failed, cancelled or completed jobs public var cleanupJob: JobName { - .init("_JobCleanup_\(self.configuration.queueName)") + .init("_Jobs_PostgresCleanup_\(self.configuration.queueName)") } /// register clean up job on queue diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index 6b0d8a4..c91aecb 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -14,36 +14,38 @@ import Foundation -/// Data rentension policy -public struct RetentionPolicy: Sendable { - /// Data retention policy - public struct RetainData: Equatable, Sendable { - enum Policy { - case retain - case doNotRetain - } +extension PostgresJobQueue { + /// Data rentension policy + public struct RetentionPolicy: Sendable { + /// Data retention policy + public struct RetainData: Equatable, Sendable { + enum Policy { + case retain + case doNotRetain + } - let rawValue: Policy - /// Retain task - public static var retain: RetainData { RetainData(rawValue: .retain) } - /// Never retain any data - public static var doNotRetain: RetainData { RetainData(rawValue: .doNotRetain) } - } + let rawValue: Policy + /// Retain task + public static var retain: RetainData { RetainData(rawValue: .retain) } + /// Never retain any data + public static var doNotRetain: RetainData { RetainData(rawValue: .doNotRetain) } + } - /// Jobs with status cancelled - public var cancelled: RetainData - /// Jobs with status completed - public var completed: RetainData - /// Jobs with status failed - public var failed: RetainData + /// Jobs with status cancelled + public var cancelled: RetainData + /// Jobs with status completed + public var completed: RetainData + /// Jobs with status failed + public var failed: RetainData - public init( - cancelled: RetainData = .doNotRetain, - completed: RetainData = .doNotRetain, - failed: RetainData = .retain - ) { - self.cancelled = cancelled - self.completed = completed - self.failed = failed + public init( + cancelled: RetainData = .doNotRetain, + completed: RetainData = .doNotRetain, + failed: RetainData = .retain + ) { + self.cancelled = cancelled + self.completed = completed + self.failed = failed + } } } From d2b80d407458fc96ed709c9670e700ab63b5efe8 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 19 May 2025 15:54:19 +0100 Subject: [PATCH 21/21] Add tests, and new version of getJobs() --- .../PostgresJobsQueue+cleanup.swift | 4 +-- Sources/JobsPostgres/PostgresJobsQueue.swift | 17 ++++++++++ Tests/JobsPostgresTests/JobsTests.swift | 32 +++++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift index 8963042..074ba4e 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift @@ -111,7 +111,7 @@ extension PostgresJobQueue { /// wait for migrations to complete before running job queue cleanup try await self.migrations.waitUntilCompleted() _ = try await self.client.withTransaction(logger: logger) { connection in - self.logger.info("Update Jobs") + self.logger.info("Cleanup Jobs") try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection) try await self.updateJobsOnInit(withStatus: .processing, onInit: processingJobs, connection: connection) try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection) @@ -148,7 +148,7 @@ extension PostgresJobQueue { ) case .rerun: - let jobs = try await getJobs(withStatus: status) + let jobs = try await getJobs(withStatus: status, connection: connection) self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") for jobID in jobs { try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 2fca1cd..dec3d84 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -502,6 +502,23 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma return jobs } + func getJobs(withStatus status: Status, connection: PostgresConnection) async throws -> [JobID] { + let stream = try await connection.query( + """ + SELECT + id + FROM swift_jobs.jobs + WHERE status = \(status) AND queue_name = \(configuration.queueName) + """, + logger: self.logger + ) + var jobs: [JobID] = [] + for try await id in stream.decode(JobID.self, context: .default) { + jobs.append(id) + } + return jobs + } + let jobRegistry: JobRegistry } diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 97d0bdc..6b6d472 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -875,6 +875,8 @@ final class JobsTests: XCTestCase { let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3) + try await jobQueue.queue.cleanup(completedJobs: .remove(maxAge: .seconds(10))) + XCTAssertEqual(completedJobs.count, 3) try await jobQueue.queue.cleanup(completedJobs: .remove(maxAge: .seconds(0))) let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(zeroJobs.count, 0) @@ -912,6 +914,36 @@ final class JobsTests: XCTestCase { } } + func testCleanupProcessingJobs() async throws { + let jobQueue = try await self.createJobQueue(numWorkers: 1) + let jobName = JobName("testCancelledJobRetention") + jobQueue.registerJob(name: jobName) { _, _ in } + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + // run postgres client + await jobQueue.queue.client.run() + } + try await jobQueue.queue.migrations.apply(client: jobQueue.queue.client, logger: jobQueue.logger, dryRun: false) + + let jobID = try await jobQueue.push(jobName, parameters: 1) + let job = try await jobQueue.queue.popFirst() + XCTAssertEqual(jobID, job?.id) + _ = try await jobQueue.push(jobName, parameters: 1) + _ = try await jobQueue.queue.popFirst() + + var processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing) + XCTAssertEqual(processingJobs.count, 2) + + try await jobQueue.queue.cleanup(processingJobs: .remove) + + processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing) + XCTAssertEqual(processingJobs.count, 0) + + group.cancelAll() + } + } + func testCleanupJob() async throws { try await self.testJobQueue( numWorkers: 1,