From 8980ebdd514eade9fd84458914b0c44b6eac1ee2 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 25 Feb 2025 08:52:27 +0000 Subject: [PATCH 1/3] Fix up after requiring all jobs to comform to JobParameters --- Package.swift | 2 +- Sources/JobsPostgres/PostgresJobsQueue.swift | 6 +- Tests/JobsPostgresTests/JobsTests.swift | 178 ++++++++++--------- 3 files changed, 101 insertions(+), 85 deletions(-) diff --git a/Package.swift b/Package.swift index 7f6a392..4455b9c 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( ], dependencies: [ // TODO: use a released version of swift-jobs - .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), + .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "job-parameters"), .package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"), .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"), ], diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 2b0ee28..83fe4be 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -165,13 +165,13 @@ public final class PostgresJobQueue: JobQueueDriver { /// Register job /// - Parameters: /// - job: Job Definition - public func registerJob(_ job: JobDefinition) { + public func registerJob(_ job: JobDefinition) { self.jobRegistry.registerJob(job) } /// Push Job onto queue /// - Returns: Identifier of queued job - @discardableResult public func push(_ jobRequest: JobRequest, options: JobOptions) async throws -> JobID { + @discardableResult public func push(_ jobRequest: JobRequest, options: JobOptions) async throws -> JobID { let jobID = JobID() try await self.client.withTransaction(logger: self.logger) { connection in try await self.add(jobID: jobID, jobRequest: jobRequest, queueName: configuration.queueName, connection: connection) @@ -182,7 +182,7 @@ public final class PostgresJobQueue: JobQueueDriver { /// Retry a job /// - Returns: Bool - public func retry(_ id: JobID, jobRequest: JobRequest, options: JobOptions) async throws { + public func retry(_ id: JobID, jobRequest: JobRequest, options: JobOptions) async throws { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in try await self.updateJob(id: id, buffer: buffer, connection: connection) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 2318a34..22decb8 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -41,14 +41,6 @@ extension XCTestExpectation { } final class JobsTests: XCTestCase { - func wait(for expectations: [XCTestExpectation], timeout: TimeInterval) async { - #if (os(Linux) && swift(<5.9)) || swift(<5.8) - super.wait(for: expectations, timeout: timeout) - #else - await fulfillment(of: expectations, timeout: timeout) - #endif - } - func createJobQueue( numWorkers: Int, configuration: PostgresJobQueue.Configuration = .init(), @@ -157,57 +149,61 @@ final class JobsTests: XCTestCase { } func testBasic() async throws { + struct TestParameters: JobParameters { + static let jobName = "testBasic" + let value: Int + } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - let jobIdentifer = JobIdentifier(#function) try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") + jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in + context.logger.info("Parameters=\(parameters.value)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() } - try await jobQueue.push(id: jobIdentifer, parameters: 1) - try await jobQueue.push(id: jobIdentifer, parameters: 2) - try await jobQueue.push(id: jobIdentifer, parameters: 3) - try await jobQueue.push(id: jobIdentifer, parameters: 4) - try await jobQueue.push(id: jobIdentifer, parameters: 5) - try await jobQueue.push(id: jobIdentifer, parameters: 6) - try await jobQueue.push(id: jobIdentifer, parameters: 7) - try await jobQueue.push(id: jobIdentifer, parameters: 8) - try await jobQueue.push(id: jobIdentifer, parameters: 9) - try await jobQueue.push(id: jobIdentifer, parameters: 10) - - await self.wait(for: [expectation], timeout: 5) + try await jobQueue.push(TestParameters(value: 1)) + try await jobQueue.push(TestParameters(value: 2)) + try await jobQueue.push(TestParameters(value: 3)) + try await jobQueue.push(TestParameters(value: 4)) + try await jobQueue.push(TestParameters(value: 5)) + try await jobQueue.push(TestParameters(value: 6)) + try await jobQueue.push(TestParameters(value: 7)) + try await jobQueue.push(TestParameters(value: 8)) + try await jobQueue.push(TestParameters(value: 9)) + try await jobQueue.push(TestParameters(value: 10)) + + await fulfillment(of: [expectation], timeout: 5) } } func testDelayedJobs() async throws { - let jobIdentifer = JobIdentifier(#function) - let jobIdentifer2 = JobIdentifier(#function) + struct TestParameters: JobParameters { + static let jobName = "testDelayedJobs" + let value: Int + } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2) let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([]) try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") + jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in + context.logger.info("Parameters=\(parameters.value)") jobExecutionSequence.withLockedValue { - $0.append(parameters) + $0.append(parameters.value) } try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() } try await jobQueue.push( - id: jobIdentifer, - parameters: 1, + TestParameters(value: 1), options: .init( delayUntil: Date.now.addingTimeInterval(1) ) ) - try await jobQueue.push(id: jobIdentifer2, parameters: 5) + try await jobQueue.push(TestParameters(value: 5)) let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) XCTAssertEqual(processingJobs.count, 2) - await self.wait(for: [expectation], timeout: 10) + await fulfillment(of: [expectation], timeout: 10) let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) XCTAssertEqual(pendingJobs.count, 0) @@ -216,13 +212,16 @@ final class JobsTests: XCTestCase { } func testMultipleWorkers() async throws { - let jobIdentifer = JobIdentifier(#function) + struct TestParameters: JobParameters { + static let jobName = "testMultipleWorkers" + let value: Int + } let runningJobCounter = ManagedAtomic(0) let maxRunningJobCounter = ManagedAtomic(0) let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, context in + jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { maxRunningJobCounter.store(runningJobs, ordering: .relaxed) @@ -233,18 +232,18 @@ final class JobsTests: XCTestCase { runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) } - try await jobQueue.push(id: jobIdentifer, parameters: 1) - try await jobQueue.push(id: jobIdentifer, parameters: 2) - try await jobQueue.push(id: jobIdentifer, parameters: 3) - try await jobQueue.push(id: jobIdentifer, parameters: 4) - try await jobQueue.push(id: jobIdentifer, parameters: 5) - try await jobQueue.push(id: jobIdentifer, parameters: 6) - try await jobQueue.push(id: jobIdentifer, parameters: 7) - try await jobQueue.push(id: jobIdentifer, parameters: 8) - try await jobQueue.push(id: jobIdentifer, parameters: 9) - try await jobQueue.push(id: jobIdentifer, parameters: 10) + try await jobQueue.push(TestParameters(value: 1)) + try await jobQueue.push(TestParameters(value: 2)) + try await jobQueue.push(TestParameters(value: 3)) + try await jobQueue.push(TestParameters(value: 4)) + try await jobQueue.push(TestParameters(value: 5)) + try await jobQueue.push(TestParameters(value: 6)) + try await jobQueue.push(TestParameters(value: 7)) + try await jobQueue.push(TestParameters(value: 8)) + try await jobQueue.push(TestParameters(value: 9)) + try await jobQueue.push(TestParameters(value: 10)) - await self.wait(for: [expectation], timeout: 5) + await fulfillment(of: [expectation], timeout: 5) XCTAssertGreaterThan(maxRunningJobCounter.load(ordering: .relaxed), 1) XCTAssertLessThanOrEqual(maxRunningJobCounter.load(ordering: .relaxed), 4) @@ -252,17 +251,19 @@ final class JobsTests: XCTestCase { } func testErrorRetryCount() async throws { - let jobIdentifer = JobIdentifier(#function) + struct TestParameters: JobParameters { + static let jobName = "testErrorRetryCount" + } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) struct FailedError: Error {} try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in + jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in expectation.fulfill() throw FailedError() } - try await jobQueue.push(id: jobIdentifer, parameters: 0) + try await jobQueue.push(TestParameters()) - await self.wait(for: [expectation], timeout: 5) + await fulfillment(of: [expectation], timeout: 5) try await Task.sleep(for: .milliseconds(200)) let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed) @@ -273,12 +274,14 @@ final class JobsTests: XCTestCase { } func testErrorRetryAndThenSucceed() async throws { - let jobIdentifer = JobIdentifier(#function) + struct TestParameters: JobParameters { + static let jobName = "testErrorRetryAndThenSucceed" + } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2) let currentJobTryCount: NIOLockedValueBox = .init(0) struct FailedError: Error {} try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in + jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in defer { currentJobTryCount.withLockedValue { $0 += 1 @@ -289,9 +292,9 @@ final class JobsTests: XCTestCase { throw FailedError() } } - try await jobQueue.push(id: jobIdentifer, parameters: 0) + try await jobQueue.push(TestParameters()) - await self.wait(for: [expectation], timeout: 5) + await fulfillment(of: [expectation], timeout: 5) try await Task.sleep(for: .milliseconds(200)) let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed) @@ -303,36 +306,38 @@ final class JobsTests: XCTestCase { } func testJobSerialization() async throws { - struct TestJobParameters: Codable { + struct TestJobParameters: JobParameters { + static let jobName = "testJobSerialization" let id: Int let message: String } let expectation = XCTestExpectation(description: "TestJob.execute was called") - let jobIdentifer = JobIdentifier(#function) try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, _ in + jobQueue.registerJob(parameters: TestJobParameters.self) { parameters, _ in XCTAssertEqual(parameters.id, 23) XCTAssertEqual(parameters.message, "Hello!") expectation.fulfill() } - try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) + try await jobQueue.push(TestJobParameters(id: 23, message: "Hello!")) - await self.wait(for: [expectation], timeout: 5) + await fulfillment(of: [expectation], timeout: 5) } } /// Test job is cancelled on shutdown func testShutdownJob() async throws { - let jobIdentifer = JobIdentifier(#function) + struct TestParameters: JobParameters { + static let jobName = "testShutdownJob" + } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { _, _ in + jobQueue.registerJob(parameters: TestParameters.self) { _, _ in expectation.fulfill() try await Task.sleep(for: .milliseconds(1000)) } - try await jobQueue.push(id: jobIdentifer, parameters: 0) - await self.wait(for: [expectation], timeout: 5) + try await jobQueue.push(TestParameters()) + await fulfillment(of: [expectation], timeout: 5) let processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing) XCTAssertEqual(processingJobs.count, 1) @@ -344,19 +349,25 @@ final class JobsTests: XCTestCase { /// test job fails to decode but queue continues to process func testFailToDecode() async throws { + struct TestIntParameter: JobParameters { + static let jobName = "testFailToDecode" + let value: Int + } + struct TestStringParameter: JobParameters { + static let jobName = "testFailToDecode" + let value: String + } let string: NIOLockedValueBox = .init("") - let jobIdentifer1 = JobIdentifier(#function) - let jobIdentifer2 = JobIdentifier(#function) let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1) try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(id: jobIdentifer2) { parameters, _ in - string.withLockedValue { $0 = parameters } + jobQueue.registerJob(parameters: TestStringParameter.self) { parameters, _ in + string.withLockedValue { $0 = parameters.value } expectation.fulfill() } - try await jobQueue.push(id: jobIdentifer1, parameters: 2) - try await jobQueue.push(id: jobIdentifer2, parameters: "test") - await self.wait(for: [expectation], timeout: 5) + try await jobQueue.push(TestIntParameter(value: 2)) + try await jobQueue.push(TestStringParameter(value: "test")) + await fulfillment(of: [expectation], timeout: 5) } string.withLockedValue { XCTAssertEqual($0, "test") @@ -366,13 +377,15 @@ final class JobsTests: XCTestCase { /// creates job that errors on first attempt, and is left on processing queue and /// is then rerun on startup of new server func testRerunAtStartup() async throws { + struct TestParameters: JobParameters { + static let jobName = "testRerunAtStartup" + } struct RetryError: Error {} - let jobIdentifer = JobIdentifier(#function) let firstTime = ManagedAtomic(true) let finished = ManagedAtomic(false) let failedExpectation = XCTestExpectation(description: "TestJob failed", expectedFulfillmentCount: 1) let succeededExpectation = XCTestExpectation(description: "TestJob2 succeeded", expectedFulfillmentCount: 1) - let job = JobDefinition(id: jobIdentifer) { _, _ in + let job = JobDefinition(parameters: TestParameters.self) { _, _ in if firstTime.compareExchange(expected: true, desired: false, ordering: .relaxed).original { failedExpectation.fulfill() throw RetryError() @@ -388,9 +401,9 @@ final class JobsTests: XCTestCase { jobQueue: jobQueue, revertMigrations: true ) { jobQueue in - try await jobQueue.push(id: jobIdentifer, parameters: 0) + try await jobQueue.push(TestParameters()) - await self.wait(for: [failedExpectation], timeout: 10) + await fulfillment(of: [failedExpectation], timeout: 10) XCTAssertFalse(firstTime.load(ordering: .relaxed)) XCTAssertFalse(finished.load(ordering: .relaxed)) @@ -399,21 +412,24 @@ final class JobsTests: XCTestCase { let jobQueue2 = try await createJobQueue(numWorkers: 1) jobQueue2.registerJob(job) try await self.testJobQueue(jobQueue: jobQueue2, failedJobsInitialization: .rerun) { _ in - await self.wait(for: [succeededExpectation], timeout: 10) + await fulfillment(of: [succeededExpectation], timeout: 10) XCTAssertTrue(finished.load(ordering: .relaxed)) } } func testMultipleJobQueueHandlers() async throws { - let jobIdentifer = JobIdentifier(#function) + struct TestParameters: JobParameters { + static let jobName = "testMultipleJobQueueHandlers" + let value: Int + } let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) let logger = { var logger = Logger(label: "testMultipleJobQueueHandlers") logger.logLevel = .debug return logger }() - let job = JobDefinition(id: jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") + let job = JobDefinition(parameters: TestParameters.self) { parameters, context in + context.logger.info("Parameters=\(parameters.value)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) expectation.fulfill() } @@ -463,11 +479,11 @@ final class JobsTests: XCTestCase { try await jobQueue.queue.cleanup(failedJobs: .remove, processingJobs: .remove) try await jobQueue2.queue.cleanup(failedJobs: .remove, processingJobs: .remove) do { - for i in 0..<100 { - try await jobQueue2.push(id: jobIdentifer, parameters: i) - try await jobQueue.push(id: jobIdentifer, parameters: i) + for i in 0..<200 { + try await jobQueue.push(TestParameters(value: i)) + try await jobQueue2.push(TestParameters(value: i)) } - await self.wait(for: [expectation], timeout: 5) + await fulfillment(of: [expectation], timeout: 5) await serviceGroup.triggerGracefulShutdown() } catch { XCTFail("\(String(reflecting: error))") From bf69beebe443408629378c54779f35c8f43284e4 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 25 Feb 2025 09:21:25 +0000 Subject: [PATCH 2/3] Fix up documentation --- Sources/JobsPostgres/PostgresJobsQueue.swift | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 83fe4be..8469350 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -180,8 +180,11 @@ public final class PostgresJobQueue: JobQueueDriver { return jobID } - /// Retry a job - /// - Returns: Bool + /// Retry an existing Job + /// - Parameters + /// - id: Job instance ID + /// - jobRequest: Job Request + /// - options: JobOptions public func retry(_ id: JobID, jobRequest: JobRequest, options: JobOptions) async throws { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in From 325940ab878aae9058f75589595ced8100d25c10 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 25 Feb 2025 14:04:19 +0000 Subject: [PATCH 3/3] Use swift-jobs main branch --- Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Package.swift b/Package.swift index 4455b9c..7f6a392 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( ], dependencies: [ // TODO: use a released version of swift-jobs - .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "job-parameters"), + .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), .package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"), .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"), ],