From 969987ab4654eda125313cc69e0c8f276ff7942e Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Sun, 16 Mar 2025 16:55:12 -0400 Subject: [PATCH 01/12] Job retention --- Sources/JobsPostgres/PostgresJobsQueue.swift | 36 +++++++++----------- Sources/JobsPostgres/RetentionPolicy.swift | 36 ++++++++++++++++++++ Tests/JobsPostgresTests/JobsTests.swift | 16 ++++----- 3 files changed, 61 insertions(+), 27 deletions(-) create mode 100644 Sources/JobsPostgres/RetentionPolicy.swift diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index b961b7b..a2c845e 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -65,25 +65,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma case highest = 4 } /// Lowest priority - public static func lowest() -> JobPriority { - JobPriority(rawValue: .lowest) - } + public static let lowest: JobPriority = JobPriority(rawValue: .lowest) /// Lower priority - public static func lower() -> JobPriority { - JobPriority(rawValue: .lower) - } + public static let lower: JobPriority = JobPriority(rawValue: .lower) /// Normal is the default priority - public static func normal() -> JobPriority { - JobPriority(rawValue: .normal) - } + public static let normal: JobPriority = JobPriority(rawValue: .normal) /// Higher priority - public static func higher() -> JobPriority { - JobPriority(rawValue: .higher) - } + public static let higher: JobPriority = JobPriority(rawValue: .higher) /// Higgest priority - public static func highest() -> JobPriority { - JobPriority(rawValue: .highest) - } + public static let highest: JobPriority = JobPriority(rawValue: .highest) } /// Options for job pushed to queue @@ -96,20 +86,20 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// Default initializer for JobOptions public init() { self.delayUntil = .now - self.priority = .normal() + self.priority = .normal } /// Initializer for JobOptions /// - Parameter delayUntil: Whether job execution should be delayed until a later date public init(delayUntil: Date?) { self.delayUntil = delayUntil ?? .now - self.priority = .normal() + self.priority = .normal } /// Initializer for JobOptions /// - Parameter delayUntil: Whether job execution should be delayed until a later date /// - Parameter priority: The priority for a job - public init(delayUntil: Date = .now, priority: JobPriority = .normal()) { + public init(delayUntil: Date = .now, priority: JobPriority = .normal) { self.delayUntil = delayUntil self.priority = priority } @@ -142,6 +132,8 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma let pollTime: Duration /// Which Queue to push jobs into let queueName: String + /// Retention policy for jobs + let retentionPolicy: RetentionPolicy /// Initialize configuration /// - Parameters @@ -149,10 +141,16 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// - queueName: Name of queue we are handing public init( pollTime: Duration = .milliseconds(100), - queueName: String = "default" + queueName: String = "default", + retentionPolicy: RetentionPolicy = .init( + canceled: .init(duration: "7D"), + completed: .init(duration: "7D"), + failed: .init(duration: "7D") + ) //.keepAll() ) { self.pollTime = pollTime self.queueName = queueName + self.retentionPolicy = retentionPolicy } } diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift new file mode 100644 index 0000000..484a03f --- /dev/null +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -0,0 +1,36 @@ +// +// RetentionPolicy.swift +// swift-jobs-postgres +// +// Created by Stevenson Michel on 3/15/25. +// + +public struct RetentionPolicy: Sendable { + + public struct RetainData: Sendable { + /// Duration in ISO 8601 format (e.g., "P30D" for 30 days) + public var duration: String + + public init(duration: String) { + self.duration = duration + } + } + +// public var days: Int +// +// public init(days: Int) { +// self.days = days +// } + /// Jobs with status cancelled + public var canceled: RetainData + /// Jobs with status completed + public var completed: RetainData + /// Jobs with status failed + public var failed: RetainData + + public init(canceled: RetainData, completed: RetainData, failed: RetainData) { + self.canceled = canceled + self.completed = completed + self.failed = failed + } +} diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 20ea92f..eda624d 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -278,14 +278,14 @@ final class JobsTests: XCTestCase { try await queue.push( TestParameters(value: 20), options: .init( - priority: .lowest() + priority: .lowest ) ) try await queue.push( TestParameters(value: 2025), options: .init( - priority: .highest() + priority: .highest ) ) @@ -332,7 +332,7 @@ final class JobsTests: XCTestCase { try await queue.push( TestParameters(value: 20), options: .init( - priority: .lower() + priority: .lower ) ) @@ -340,7 +340,7 @@ final class JobsTests: XCTestCase { TestParameters(value: 2025), options: .init( delayUntil: Date.now.addingTimeInterval(1), - priority: .higher() + priority: .higher ) ) @@ -717,14 +717,14 @@ final class JobsTests: XCTestCase { let resumableJob = try await queue.push( ResumableJob(), options: .init( - priority: .lowest() + priority: .lowest ) ) try await queue.push( TestParameters(), options: .init( - priority: .normal() + priority: .normal ) ) @@ -793,14 +793,14 @@ final class JobsTests: XCTestCase { let cancellableJob = try await queue.push( TestParameters(value: 42), options: .init( - priority: .lower() + priority: .lower ) ) try await queue.push( NoneCancelledJobParameters(value: 2025), options: .init( - priority: .highest() + priority: .highest ) ) From 5b9a1ad6a28ad89bcf7ff43e7f48103877225128 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 12:27:03 -0400 Subject: [PATCH 02/12] Job retention --- Sources/JobsPostgres/PostgresJobsQueue.swift | 74 ++++++++++++++++---- Sources/JobsPostgres/RetentionPolicy.swift | 39 ++++++----- Tests/JobsPostgresTests/JobsTests.swift | 56 ++++++++++++++- 3 files changed, 136 insertions(+), 33 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index a2c845e..58c5237 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -124,6 +124,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma case failed = 2 case cancelled = 3 case paused = 4 + case completed = 5 } /// Queue configuration @@ -143,10 +144,10 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma pollTime: Duration = .milliseconds(100), queueName: String = "default", retentionPolicy: RetentionPolicy = .init( - canceled: .init(duration: "7D"), - completed: .init(duration: "7D"), - failed: .init(duration: "7D") - ) //.keepAll() + cancelled: .init(duration: 7 * 24 * 60), + completed: .init(duration: 7 * 24 * 60), + failed: .init(duration: 7 * 24 * 60) + ) ) { self.pollTime = pollTime self.queueName = queueName @@ -192,8 +193,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public func cancel(jobID: JobID) async throws { try await self.client.withTransaction(logger: logger) { connection in try await deleteFromQueue(jobID: jobID, connection: connection) - try await delete(jobID: jobID) + try await setStatus(jobID: jobID, status: .cancelled, connection: connection) } + try await processDataRetentionPolicy() } /// Pause job @@ -298,15 +300,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// Retry an existing Job /// - Parameters - /// - id: Job instance ID + /// - jobID: Job instance ID /// - jobRequest: Job Request /// - options: Job retry options - public func retry(_ id: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { + public func retry(_ jobID: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in - try await self.updateJob(id: id, buffer: buffer, connection: connection) + try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection) try await self.addToQueue( - jobID: id, + jobID: jobID, queueName: configuration.queueName, options: .init(delayUntil: options.delayUntil), connection: connection @@ -316,7 +318,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// This is called to say job has finished processing and it can be deleted public func finished(jobID: JobID) async throws { - try await self.delete(jobID: jobID) + //try await self.delete(jobID: jobID) + try await self.setStatus(jobID: jobID, status: .completed) + try await processDataRetentionPolicy() } /// This is called to say job has failed to run and should be put aside @@ -459,15 +463,59 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma logger: self.logger ) } - // TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged? - func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { + + func processDataRetentionPolicy() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + try await self.client.withTransaction(logger: logger) { tx in + let now = Date.now.timeIntervalSince1970 + let retentionPolicy = configuration.retentionPolicy + /// process // cancelled events + group.addTask { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.cancelled) + AND extract(epoch FROM last_modified)::int + \(retentionPolicy.cancelled.duration) < \(now) + """, + logger: self.logger + ) + } + + /// process failed events clean up + group.addTask { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.failed) + AND extract(epoch FROM last_modified)::int + \(retentionPolicy.failed.duration) < \(now) + """, + logger: self.logger + ) + } + + /// process completed events + group.addTask { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.completed) + AND extract(epoch FROM last_modified)::int + \(retentionPolicy.completed.duration) < \(now) + """, + logger: self.logger + ) + } + } + } + } + + func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { try await connection.query( """ UPDATE swift_jobs.jobs SET job = \(buffer), last_modified = \(Date.now), status = \(Status.failed) - WHERE id = \(id) AND queue_name = \(configuration.queueName) + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) """, logger: self.logger ) diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index 484a03f..2548997 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -1,35 +1,40 @@ +//===----------------------------------------------------------------------===// // -// RetentionPolicy.swift -// swift-jobs-postgres +// This source file is part of the Hummingbird server framework project // -// Created by Stevenson Michel on 3/15/25. +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 // +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Foundation + +/// Data rentension policy public struct RetentionPolicy: Sendable { - + /// Data retention public struct RetainData: Sendable { - /// Duration in ISO 8601 format (e.g., "P30D" for 30 days) - public var duration: String - - public init(duration: String) { + /// Duration + public var duration: TimeInterval + + public init(duration: TimeInterval) { self.duration = duration } } - -// public var days: Int -// -// public init(days: Int) { -// self.days = days -// } + /// Jobs with status cancelled - public var canceled: RetainData + public var cancelled: RetainData /// Jobs with status completed public var completed: RetainData /// Jobs with status failed public var failed: RetainData - public init(canceled: RetainData, completed: RetainData, failed: RetainData) { - self.canceled = canceled + public init(cancelled: RetainData, completed: RetainData, failed: RetainData) { + self.cancelled = cancelled self.completed = completed self.failed = failed } diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index eda624d..3ff5ed4 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -178,10 +178,15 @@ final class JobsTests: XCTestCase { processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove, pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove, revertMigrations: Bool = true, + configuration: PostgresJobQueue.Configuration = .init(), function: String = #function, test: (JobQueue) async throws -> T ) async throws -> T { - let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function) + let jobQueue = try await self.createJobQueue( + numWorkers: numWorkers, + configuration: configuration, + function: function + ) return try await self.testJobQueue( jobQueue: jobQueue, failedJobsInitialization: failedJobsInitialization, @@ -490,7 +495,16 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - try await self.testJobQueue(numWorkers: 4) { jobQueue in + try await self.testJobQueue( + numWorkers: 4, + configuration: .init( + retentionPolicy: .init( + cancelled: .init(duration: -1), + completed: .init(duration: -1), + failed: .init(duration: -1) + ) + ) + ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { _, _ in expectation.fulfill() try await Task.sleep(for: .milliseconds(1000)) @@ -769,7 +783,17 @@ final class JobsTests: XCTestCase { let didRunCancelledJob: NIOLockedValueBox = .init(false) let didRunNoneCancelledJob: NIOLockedValueBox = .init(false) - let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function) + let jobQueue = try await self.createJobQueue( + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .init(duration: -1), + completed: .init(duration: -1), + failed: .init(duration: -1) + ) + ), + function: #function + ) try await testPriorityJobQueue(jobQueue: jobQueue) { queue in queue.registerJob(parameters: TestParameters.self) { parameters, context in @@ -827,4 +851,30 @@ final class JobsTests: XCTestCase { XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false) XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true) } + + func testJobRetention() async throws { + struct TestParameters: JobParameters { + static let jobName = "testJobRetention" + let value: Int + } + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) + try await self.testJobQueue(numWorkers: 1) { jobQueue in + jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in + context.logger.info("Parameters=\(parameters.value)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() + } + let firstJob = try await jobQueue.push(TestParameters(value: 1)) + let secondJob = try await jobQueue.push(TestParameters(value: 2)) + let thirdJob = try await jobQueue.push(TestParameters(value: 3)) + + await fulfillment(of: [expectation], timeout: 5) + + let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) + XCTAssertEqual(completedJobs.count, 3) + try await jobQueue.queue.delete(jobID: firstJob) + try await jobQueue.queue.delete(jobID: secondJob) + try await jobQueue.queue.delete(jobID: thirdJob) + } + } } From a3a3691d2c99a2260d71d386a111e3f0cd7841e8 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 12:32:30 -0400 Subject: [PATCH 03/12] Swift format --- Sources/JobsPostgres/PostgresJobsQueue.swift | 2 +- Sources/JobsPostgres/RetentionPolicy.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 58c5237..a285fa4 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -465,7 +465,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma } func processDataRetentionPolicy() async throws { - try await withThrowingTaskGroup(of: Void.self) { group in + try await withThrowingTaskGroup(of: Void.self) { group in try await self.client.withTransaction(logger: logger) { tx in let now = Date.now.timeIntervalSince1970 let retentionPolicy = configuration.retentionPolicy diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index 2548997..b872f5b 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -32,7 +32,7 @@ public struct RetentionPolicy: Sendable { public var completed: RetainData /// Jobs with status failed public var failed: RetainData - + public init(cancelled: RetainData, completed: RetainData, failed: RetainData) { self.cancelled = cancelled self.completed = completed From 3d5352f45e03b26da65aa0c1ce2a91fa97fe9278 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 22:48:03 -0400 Subject: [PATCH 04/12] Addressing PR comments --- Sources/JobsPostgres/PostgresJobsQueue.swift | 100 ++++++++++--------- Sources/JobsPostgres/RetentionPolicy.swift | 40 ++++++-- Tests/JobsPostgresTests/JobsTests.swift | 33 +++--- 3 files changed, 104 insertions(+), 69 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index a285fa4..f8a2321 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -143,11 +143,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public init( pollTime: Duration = .milliseconds(100), queueName: String = "default", - retentionPolicy: RetentionPolicy = .init( - cancelled: .init(duration: 7 * 24 * 60), - completed: .init(duration: 7 * 24 * 60), - failed: .init(duration: 7 * 24 * 60) - ) + retentionPolicy: RetentionPolicy = .init() ) { self.pollTime = pollTime self.queueName = queueName @@ -193,9 +189,12 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma public func cancel(jobID: JobID) async throws { try await self.client.withTransaction(logger: logger) { connection in try await deleteFromQueue(jobID: jobID, connection: connection) - try await setStatus(jobID: jobID, status: .cancelled, connection: connection) + if configuration.retentionPolicy.cancelled == .never { + try await delete(jobID: jobID) + } else { + try await setStatus(jobID: jobID, status: .cancelled, connection: connection) + } } - try await processDataRetentionPolicy() } /// Pause job @@ -318,14 +317,20 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma /// This is called to say job has finished processing and it can be deleted public func finished(jobID: JobID) async throws { - //try await self.delete(jobID: jobID) - try await self.setStatus(jobID: jobID, status: .completed) - try await processDataRetentionPolicy() + if configuration.retentionPolicy.completed == .never { + try await self.delete(jobID: jobID) + } else { + try await self.setStatus(jobID: jobID, status: .completed) + } } /// This is called to say job has failed to run and should be put aside public func failed(jobID: JobID, error: Error) async throws { - try await self.setStatus(jobID: jobID, status: .failed) + if configuration.retentionPolicy.failed == .never { + try await self.delete(jobID: jobID) + } else { + try await self.setStatus(jobID: jobID, status: .failed) + } } /// stop serving jobs @@ -463,47 +468,44 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma logger: self.logger ) } + /// Helper func which to be use by a scheduled jobs + /// for performing job clean up based on a given set of policies + public func processDataRetentionPolicies() async throws { + try await self.client.withTransaction(logger: logger) { tx in + let now = Date.now.timeIntervalSince1970 + let retentionPolicy: RetentionPolicy = configuration.retentionPolicy - func processDataRetentionPolicy() async throws { - try await withThrowingTaskGroup(of: Void.self) { group in - try await self.client.withTransaction(logger: logger) { tx in - let now = Date.now.timeIntervalSince1970 - let retentionPolicy = configuration.retentionPolicy - /// process // cancelled events - group.addTask { - try await tx.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(Status.cancelled) - AND extract(epoch FROM last_modified)::int + \(retentionPolicy.cancelled.duration) < \(now) - """, - logger: self.logger - ) - } + if case let .retain(timeAmout) = retentionPolicy.cancelled.rawValue { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.cancelled) + AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) + """, + logger: self.logger + ) + } - /// process failed events clean up - group.addTask { - try await tx.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(Status.failed) - AND extract(epoch FROM last_modified)::int + \(retentionPolicy.failed.duration) < \(now) - """, - logger: self.logger - ) - } + if case let .retain(timeAmout) = retentionPolicy.completed.rawValue { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.completed) + AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) + """, + logger: self.logger + ) + } - /// process completed events - group.addTask { - try await tx.query( - """ - DELETE FROM swift_jobs.jobs - WHERE status = \(Status.completed) - AND extract(epoch FROM last_modified)::int + \(retentionPolicy.completed.duration) < \(now) - """, - logger: self.logger - ) - } + if case let .retain(timeAmout) = retentionPolicy.failed.rawValue { + try await tx.query( + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(Status.failed) + AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now) + """, + logger: self.logger + ) } } } diff --git a/Sources/JobsPostgres/RetentionPolicy.swift b/Sources/JobsPostgres/RetentionPolicy.swift index b872f5b..2986d33 100644 --- a/Sources/JobsPostgres/RetentionPolicy.swift +++ b/Sources/JobsPostgres/RetentionPolicy.swift @@ -16,24 +16,50 @@ import Foundation /// Data rentension policy public struct RetentionPolicy: Sendable { - /// Data retention - public struct RetainData: Sendable { - /// Duration - public var duration: TimeInterval - public init(duration: TimeInterval) { - self.duration = duration + /// Data retention policy + public struct RetainData: Equatable, Sendable { + enum Policy { + case retain(for: TimeInterval) + case doNotRetain + } + + let rawValue: Policy + /// Retain policy + /// default to 7 days + public static func retain(for timeAmout: TimeInterval = 60 * 60 * 24 * 7) -> RetainData { + RetainData(rawValue: .retain(for: timeAmout)) + } + /// Never retain any data + public static let never: RetainData = RetainData(rawValue: .doNotRetain) + + public static func == (lhs: RetentionPolicy.RetainData, rhs: RetentionPolicy.RetainData) -> Bool { + switch (lhs.rawValue, rhs.rawValue) { + case (.retain(let lhsTimeAmout), .retain(let rhsTimeAmout)): + return lhsTimeAmout == rhsTimeAmout + case (.doNotRetain, .doNotRetain): + return true + default: + return false + } } } /// Jobs with status cancelled + /// default retention is set for 7 days public var cancelled: RetainData /// Jobs with status completed + /// default retention is set to 7 days public var completed: RetainData /// Jobs with status failed + /// default retention is set to 30 days public var failed: RetainData - public init(cancelled: RetainData, completed: RetainData, failed: RetainData) { + public init( + cancelled: RetainData = .retain(), + completed: RetainData = .retain(), + failed: RetainData = .retain(for: 60 * 60 * 24 * 30) + ) { self.cancelled = cancelled self.completed = completed self.failed = failed diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 3ff5ed4..109d74c 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -499,9 +499,9 @@ final class JobsTests: XCTestCase { numWorkers: 4, configuration: .init( retentionPolicy: .init( - cancelled: .init(duration: -1), - completed: .init(duration: -1), - failed: .init(duration: -1) + cancelled: .never, + completed: .never, + failed: .never ) ) ) { jobQueue in @@ -787,9 +787,9 @@ final class JobsTests: XCTestCase { numWorkers: 1, configuration: .init( retentionPolicy: .init( - cancelled: .init(duration: -1), - completed: .init(duration: -1), - failed: .init(duration: -1) + cancelled: .never, + completed: .never, + failed: .never ) ), function: #function @@ -858,23 +858,30 @@ final class JobsTests: XCTestCase { let value: Int } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) - try await self.testJobQueue(numWorkers: 1) { jobQueue in + try await self.testJobQueue( + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .retain(for: -1), + completed: .retain(for: -1), + failed: .retain(for: -1) + ) + ) + ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in context.logger.info("Parameters=\(parameters.value)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() } - let firstJob = try await jobQueue.push(TestParameters(value: 1)) - let secondJob = try await jobQueue.push(TestParameters(value: 2)) - let thirdJob = try await jobQueue.push(TestParameters(value: 3)) + try await jobQueue.push(TestParameters(value: 1)) + try await jobQueue.push(TestParameters(value: 2)) + try await jobQueue.push(TestParameters(value: 3)) await fulfillment(of: [expectation], timeout: 5) let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3) - try await jobQueue.queue.delete(jobID: firstJob) - try await jobQueue.queue.delete(jobID: secondJob) - try await jobQueue.queue.delete(jobID: thirdJob) + try await jobQueue.queue.processDataRetentionPolicy() } } } From 7b5fae1f048316dfd07e8b8f618d7cf188ab2216 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 22:53:37 -0400 Subject: [PATCH 05/12] Update PostgresJobsQueue.swift --- Sources/JobsPostgres/PostgresJobsQueue.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index f8a2321..a81eb06 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -470,7 +470,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma } /// Helper func which to be use by a scheduled jobs /// for performing job clean up based on a given set of policies - public func processDataRetentionPolicies() async throws { + public func processDataRetentionPolicy() async throws { try await self.client.withTransaction(logger: logger) { tx in let now = Date.now.timeIntervalSince1970 let retentionPolicy: RetentionPolicy = configuration.retentionPolicy From 5a36adb8e3a3e24265ebaa5658c583981a5500f5 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 18 Mar 2025 23:01:31 -0400 Subject: [PATCH 06/12] fixing the failing test --- Tests/JobsPostgresTests/JobsTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 109d74c..a17c43d 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -877,7 +877,7 @@ final class JobsTests: XCTestCase { try await jobQueue.push(TestParameters(value: 2)) try await jobQueue.push(TestParameters(value: 3)) - await fulfillment(of: [expectation], timeout: 5) + await fulfillment(of: [expectation], timeout: 10) let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3) From b39f0b03565fb50fdcc8b064bd1c439cdb6eb6f7 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 19:59:05 -0400 Subject: [PATCH 07/12] Adding job pruner and instruction on how to register and run the job on a schedule --- Sources/JobsPostgres/JobPruner.swift | 19 +++++++++++++++++++ Sources/JobsPostgres/PostgresJobsQueue.swift | 8 ++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 Sources/JobsPostgres/JobPruner.swift diff --git a/Sources/JobsPostgres/JobPruner.swift b/Sources/JobsPostgres/JobPruner.swift new file mode 100644 index 0000000..ced8663 --- /dev/null +++ b/Sources/JobsPostgres/JobPruner.swift @@ -0,0 +1,19 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Jobs + +public struct JobPruner: JobParameters { + static public var jobName: String { "JobPruner" } +} diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index a81eb06..24b1f2b 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -468,8 +468,12 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma logger: self.logger ) } - /// Helper func which to be use by a scheduled jobs - /// for performing job clean up based on a given set of policies + + /// This menthod shall be called with the JobPruner after registration as follow + /// someJobQueue.registerJobparameters: JobPruner.self) { _, _ in + /// try await someJobQueue.processDataRetentionPolicy() + /// } + /// let jobScheddule = JobSchedule([ .init(job: JobPruner(), schedule: .everyHour()) ]) public func processDataRetentionPolicy() async throws { try await self.client.withTransaction(logger: logger) { tx in let now = Date.now.timeIntervalSince1970 From 944232249a8dd60a8b4d0b989eeaa04cbf409457 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 20:14:17 -0400 Subject: [PATCH 08/12] increase time amount for test job --- Tests/JobsPostgresTests/JobsTests.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index fe3fb53..08f7516 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -862,9 +862,9 @@ final class JobsTests: XCTestCase { numWorkers: 1, configuration: .init( retentionPolicy: .init( - cancelled: .retain(for: -1), - completed: .retain(for: -1), - failed: .retain(for: -1) + cancelled: .retain(for: 1), + completed: .retain(for: 1), + failed: .retain(for: 1) ) ) ) { jobQueue in From 9510a66442eb5caeb54a65182fa5dfbe18abdaae Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 20:22:52 -0400 Subject: [PATCH 09/12] Update JobsTests.swift --- Tests/JobsPostgresTests/JobsTests.swift | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 08f7516..c9891ca 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -859,14 +859,7 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) try await self.testJobQueue( - numWorkers: 1, - configuration: .init( - retentionPolicy: .init( - cancelled: .retain(for: 1), - completed: .retain(for: 1), - failed: .retain(for: 1) - ) - ) + numWorkers: 1 ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in context.logger.info("Parameters=\(parameters.value)") From 33bc4e439279b777401f85d655c31ddb334a0b1a Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 20:39:39 -0400 Subject: [PATCH 10/12] Update JobsTests.swift --- Tests/JobsPostgresTests/JobsTests.swift | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index c9891ca..f816c09 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -859,11 +859,17 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) try await self.testJobQueue( - numWorkers: 1 + numWorkers: 1, + configuration: .init( + retentionPolicy: .init( + cancelled: .retain(for: -1), + completed: .retain(for: -1), + failed: .retain(for: -1) + ) + ) ) { jobQueue in jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in context.logger.info("Parameters=\(parameters.value)") - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() } try await jobQueue.push(TestParameters(value: 1)) @@ -875,6 +881,8 @@ final class JobsTests: XCTestCase { let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3) try await jobQueue.queue.processDataRetentionPolicy() + let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .completed) + XCTAssertEqual(zeroJobs.count, 0) } } } From 028c2814b07723fc3a9e18e8ae6cd891246468f3 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 20:51:20 -0400 Subject: [PATCH 11/12] up concurrency to 3 to match number of job. There seems to be a timing issue --- Tests/JobsPostgresTests/JobsTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index f816c09..b459a3b 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -859,7 +859,7 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) try await self.testJobQueue( - numWorkers: 1, + numWorkers: 3, configuration: .init( retentionPolicy: .init( cancelled: .retain(for: -1), From be47d21b9cc84f7cb4b4e8807aee5b61b6cf12db Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 12 May 2025 21:00:13 -0400 Subject: [PATCH 12/12] Wait for 200 milliseconds before checking the job count. --- Tests/JobsPostgresTests/JobsTests.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index b459a3b..47db781 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -859,7 +859,7 @@ final class JobsTests: XCTestCase { } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3) try await self.testJobQueue( - numWorkers: 3, + numWorkers: 1, configuration: .init( retentionPolicy: .init( cancelled: .retain(for: -1), @@ -877,6 +877,7 @@ final class JobsTests: XCTestCase { try await jobQueue.push(TestParameters(value: 3)) await fulfillment(of: [expectation], timeout: 10) + try await Task.sleep(for: .milliseconds(200)) let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed) XCTAssertEqual(completedJobs.count, 3)