diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index ea8b46b..1017aec 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -51,6 +51,23 @@ public final class PostgresJobQueue: JobQueueDriver { case remove } + /// Options for job pushed to queue + public struct JobOptions: JobOptionsProtocol { + /// Delay running job until + public var delayUntil: Date + + /// Default initializer for JobOptions + public init() { + self.delayUntil = .now + } + + /// Initializer for JobOptions + /// - Parameter delayUntil: Whether job execution should be delayed until a later date + public init(delayUntil: Date?) { + self.delayUntil = delayUntil ?? .now + } + } + /// Errors thrown by PostgresJobQueue public enum PostgresQueueError: Error, CustomStringConvertible { case failedToAdd @@ -186,8 +203,8 @@ public final class PostgresJobQueue: JobQueueDriver { /// - Parameters /// - id: Job instance ID /// - jobRequest: Job Request - /// - options: JobOptions - public func retry(_ id: JobID, jobRequest: JobRequest, options: JobOptions) async throws { + /// - options: Job retry options + public func retry(_ id: JobID, jobRequest: JobRequest, options: JobRetryOptions) async throws { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in try await self.updateJob(id: id, buffer: buffer, connection: connection) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 22decb8..b669807 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -65,11 +65,7 @@ final class JobsTests: XCTestCase { ), numWorkers: numWorkers, logger: logger, - options: .init( - maximumBackoff: 0.01, - maxJitter: 0.01, - minJitter: 0.0 - ) + options: .init(defaultRetryStrategy: .exponentialJitter(maxBackoff: 0.01, maxJitter: 0.01)) ) } @@ -257,7 +253,10 @@ final class JobsTests: XCTestCase { let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) struct FailedError: Error {} try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in + jobQueue.registerJob( + parameters: TestParameters.self, + retryStrategy: .exponentialJitter(maxAttempts: 3, maxBackoff: 0.01, maxJitter: 0.01) + ) { _, _ in expectation.fulfill() throw FailedError() } @@ -281,7 +280,10 @@ final class JobsTests: XCTestCase { let currentJobTryCount: NIOLockedValueBox = .init(0) struct FailedError: Error {} try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in + jobQueue.registerJob( + parameters: TestParameters.self, + retryStrategy: .exponentialJitter(maxAttempts: 3, maxBackoff: 0.01, maxJitter: 0.01) + ) { _, _ in defer { currentJobTryCount.withLockedValue { $0 += 1