From 54cecbffc8bc9ffdfc7968d32cbf21dfea207a56 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 6 Mar 2025 12:34:39 +0000 Subject: [PATCH 1/2] Fixup after retry and job option changes --- Sources/JobsPostgres/PostgresJobsQueue.swift | 19 ++++++++++++++++++- Tests/JobsPostgresTests/JobsTests.swift | 16 +++++++++------- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index ea8b46b..8d8acd1 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -51,6 +51,23 @@ public final class PostgresJobQueue: JobQueueDriver { case remove } + /// Options for job pushed to queue + public struct JobOptions: JobOptionsProtocol { + /// Delay running job until + public var delayUntil: Date + + /// Default initializer for JobOptions + public init() { + self.delayUntil = .now + } + + /// Initializer for JobOptions + /// - Parameter delayUntil: Whether job execution should be delayed until a later date + public init(delayUntil: Date?) { + self.delayUntil = delayUntil ?? .now + } + } + /// Errors thrown by PostgresJobQueue public enum PostgresQueueError: Error, CustomStringConvertible { case failedToAdd @@ -187,7 +204,7 @@ public final class PostgresJobQueue: JobQueueDriver { /// - id: Job instance ID /// - jobRequest: Job Request /// - options: JobOptions - public func retry(_ id: JobID, jobRequest: JobRequest, options: JobOptions) async throws { + public func retry(_ id: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in try await self.updateJob(id: id, buffer: buffer, connection: connection) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 22decb8..b669807 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -65,11 +65,7 @@ final class JobsTests: XCTestCase { ), numWorkers: numWorkers, logger: logger, - options: .init( - maximumBackoff: 0.01, - maxJitter: 0.01, - minJitter: 0.0 - ) + options: .init(defaultRetryStrategy: .exponentialJitter(maxBackoff: 0.01, maxJitter: 0.01)) ) } @@ -257,7 +253,10 @@ final class JobsTests: XCTestCase { let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) struct FailedError: Error {} try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in + jobQueue.registerJob( + parameters: TestParameters.self, + retryStrategy: .exponentialJitter(maxAttempts: 3, maxBackoff: 0.01, maxJitter: 0.01) + ) { _, _ in expectation.fulfill() throw FailedError() } @@ -281,7 +280,10 @@ final class JobsTests: XCTestCase { let currentJobTryCount: NIOLockedValueBox = .init(0) struct FailedError: Error {} try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in + jobQueue.registerJob( + parameters: TestParameters.self, + retryStrategy: .exponentialJitter(maxAttempts: 3, maxBackoff: 0.01, maxJitter: 0.01) + ) { _, _ in defer { currentJobTryCount.withLockedValue { $0 += 1 From ec442f7b54feb6ef143444a68abe41c094dc1154 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 6 Mar 2025 12:46:26 +0000 Subject: [PATCH 2/2] Change from PR comments --- Sources/JobsPostgres/PostgresJobsQueue.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 8d8acd1..1017aec 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -203,7 +203,7 @@ public final class PostgresJobQueue: JobQueueDriver { /// - Parameters /// - id: Job instance ID /// - jobRequest: Job Request - /// - options: JobOptions + /// - options: Job retry options public func retry(_ id: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in