Skip to content

Fix up after requiring all jobs to conform to JobParameters #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ public final class PostgresJobQueue: JobQueueDriver {
/// Register job
/// - Parameters:
/// - job: Job Definition
public func registerJob<Parameters: Codable & Sendable>(_ job: JobDefinition<Parameters>) {
public func registerJob<Parameters: JobParameters>(_ job: JobDefinition<Parameters>) {
self.jobRegistry.registerJob(job)
}

/// Push Job onto queue
/// - Returns: Identifier of queued job
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
@discardableResult public func push<Parameters: JobParameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
let jobID = JobID()
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.add(jobID: jobID, jobRequest: jobRequest, queueName: configuration.queueName, connection: connection)
Expand All @@ -180,9 +180,12 @@ public final class PostgresJobQueue: JobQueueDriver {
return jobID
}

/// Retry a job
/// - Returns: Bool
public func retry<Parameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobOptions) async throws {
/// Retry an existing Job
/// - Parameters
/// - id: Job instance ID
/// - jobRequest: Job Request
/// - options: JobOptions
public func retry<Parameters: JobParameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobOptions) async throws {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.updateJob(id: id, buffer: buffer, connection: connection)
Expand Down
178 changes: 97 additions & 81 deletions Tests/JobsPostgresTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ extension XCTestExpectation {
}

final class JobsTests: XCTestCase {
func wait(for expectations: [XCTestExpectation], timeout: TimeInterval) async {
#if (os(Linux) && swift(<5.9)) || swift(<5.8)
super.wait(for: expectations, timeout: timeout)
#else
await fulfillment(of: expectations, timeout: timeout)
#endif
}

func createJobQueue(
numWorkers: Int,
configuration: PostgresJobQueue.Configuration = .init(),
Expand Down Expand Up @@ -157,57 +149,61 @@ final class JobsTests: XCTestCase {
}

func testBasic() async throws {
struct TestParameters: JobParameters {
static let jobName = "testBasic"
let value: Int
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10)
let jobIdentifer = JobIdentifier<Int>(#function)
try await self.testJobQueue(numWorkers: 1) { jobQueue in
jobQueue.registerJob(id: jobIdentifer) { parameters, context in
context.logger.info("Parameters=\(parameters)")
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
context.logger.info("Parameters=\(parameters.value)")
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
expectation.fulfill()
}
try await jobQueue.push(id: jobIdentifer, parameters: 1)
try await jobQueue.push(id: jobIdentifer, parameters: 2)
try await jobQueue.push(id: jobIdentifer, parameters: 3)
try await jobQueue.push(id: jobIdentifer, parameters: 4)
try await jobQueue.push(id: jobIdentifer, parameters: 5)
try await jobQueue.push(id: jobIdentifer, parameters: 6)
try await jobQueue.push(id: jobIdentifer, parameters: 7)
try await jobQueue.push(id: jobIdentifer, parameters: 8)
try await jobQueue.push(id: jobIdentifer, parameters: 9)
try await jobQueue.push(id: jobIdentifer, parameters: 10)

await self.wait(for: [expectation], timeout: 5)
try await jobQueue.push(TestParameters(value: 1))
try await jobQueue.push(TestParameters(value: 2))
try await jobQueue.push(TestParameters(value: 3))
try await jobQueue.push(TestParameters(value: 4))
try await jobQueue.push(TestParameters(value: 5))
try await jobQueue.push(TestParameters(value: 6))
try await jobQueue.push(TestParameters(value: 7))
try await jobQueue.push(TestParameters(value: 8))
try await jobQueue.push(TestParameters(value: 9))
try await jobQueue.push(TestParameters(value: 10))

await fulfillment(of: [expectation], timeout: 5)
}
}

func testDelayedJobs() async throws {
let jobIdentifer = JobIdentifier<Int>(#function)
let jobIdentifer2 = JobIdentifier<Int>(#function)
struct TestParameters: JobParameters {
static let jobName = "testDelayedJobs"
let value: Int
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([])

try await self.testJobQueue(numWorkers: 1) { jobQueue in
jobQueue.registerJob(id: jobIdentifer) { parameters, context in
context.logger.info("Parameters=\(parameters)")
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
context.logger.info("Parameters=\(parameters.value)")
jobExecutionSequence.withLockedValue {
$0.append(parameters)
$0.append(parameters.value)
}
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
expectation.fulfill()
}
try await jobQueue.push(
id: jobIdentifer,
parameters: 1,
TestParameters(value: 1),
options: .init(
delayUntil: Date.now.addingTimeInterval(1)
)
)
try await jobQueue.push(id: jobIdentifer2, parameters: 5)
try await jobQueue.push(TestParameters(value: 5))

let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
XCTAssertEqual(processingJobs.count, 2)

await self.wait(for: [expectation], timeout: 10)
await fulfillment(of: [expectation], timeout: 10)

let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
XCTAssertEqual(pendingJobs.count, 0)
Expand All @@ -216,13 +212,16 @@ final class JobsTests: XCTestCase {
}

func testMultipleWorkers() async throws {
let jobIdentifer = JobIdentifier<Int>(#function)
struct TestParameters: JobParameters {
static let jobName = "testMultipleWorkers"
let value: Int
}
let runningJobCounter = ManagedAtomic(0)
let maxRunningJobCounter = ManagedAtomic(0)
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10)

try await self.testJobQueue(numWorkers: 4) { jobQueue in
jobQueue.registerJob(id: jobIdentifer) { parameters, context in
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) {
maxRunningJobCounter.store(runningJobs, ordering: .relaxed)
Expand All @@ -233,36 +232,38 @@ final class JobsTests: XCTestCase {
runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed)
}

try await jobQueue.push(id: jobIdentifer, parameters: 1)
try await jobQueue.push(id: jobIdentifer, parameters: 2)
try await jobQueue.push(id: jobIdentifer, parameters: 3)
try await jobQueue.push(id: jobIdentifer, parameters: 4)
try await jobQueue.push(id: jobIdentifer, parameters: 5)
try await jobQueue.push(id: jobIdentifer, parameters: 6)
try await jobQueue.push(id: jobIdentifer, parameters: 7)
try await jobQueue.push(id: jobIdentifer, parameters: 8)
try await jobQueue.push(id: jobIdentifer, parameters: 9)
try await jobQueue.push(id: jobIdentifer, parameters: 10)
try await jobQueue.push(TestParameters(value: 1))
try await jobQueue.push(TestParameters(value: 2))
try await jobQueue.push(TestParameters(value: 3))
try await jobQueue.push(TestParameters(value: 4))
try await jobQueue.push(TestParameters(value: 5))
try await jobQueue.push(TestParameters(value: 6))
try await jobQueue.push(TestParameters(value: 7))
try await jobQueue.push(TestParameters(value: 8))
try await jobQueue.push(TestParameters(value: 9))
try await jobQueue.push(TestParameters(value: 10))

await self.wait(for: [expectation], timeout: 5)
await fulfillment(of: [expectation], timeout: 5)

XCTAssertGreaterThan(maxRunningJobCounter.load(ordering: .relaxed), 1)
XCTAssertLessThanOrEqual(maxRunningJobCounter.load(ordering: .relaxed), 4)
}
}

func testErrorRetryCount() async throws {
let jobIdentifer = JobIdentifier<Int>(#function)
struct TestParameters: JobParameters {
static let jobName = "testErrorRetryCount"
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4)
struct FailedError: Error {}
try await self.testJobQueue(numWorkers: 1) { jobQueue in
jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in
jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in
expectation.fulfill()
throw FailedError()
}
try await jobQueue.push(id: jobIdentifer, parameters: 0)
try await jobQueue.push(TestParameters())

await self.wait(for: [expectation], timeout: 5)
await fulfillment(of: [expectation], timeout: 5)
try await Task.sleep(for: .milliseconds(200))

let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed)
Expand All @@ -273,12 +274,14 @@ final class JobsTests: XCTestCase {
}

func testErrorRetryAndThenSucceed() async throws {
let jobIdentifer = JobIdentifier<Int>(#function)
struct TestParameters: JobParameters {
static let jobName = "testErrorRetryAndThenSucceed"
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
let currentJobTryCount: NIOLockedValueBox<Int> = .init(0)
struct FailedError: Error {}
try await self.testJobQueue(numWorkers: 1) { jobQueue in
jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in
jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in
defer {
currentJobTryCount.withLockedValue {
$0 += 1
Expand All @@ -289,9 +292,9 @@ final class JobsTests: XCTestCase {
throw FailedError()
}
}
try await jobQueue.push(id: jobIdentifer, parameters: 0)
try await jobQueue.push(TestParameters())

await self.wait(for: [expectation], timeout: 5)
await fulfillment(of: [expectation], timeout: 5)
try await Task.sleep(for: .milliseconds(200))

let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed)
Expand All @@ -303,36 +306,38 @@ final class JobsTests: XCTestCase {
}

func testJobSerialization() async throws {
struct TestJobParameters: Codable {
struct TestJobParameters: JobParameters {
static let jobName = "testJobSerialization"
let id: Int
let message: String
}
let expectation = XCTestExpectation(description: "TestJob.execute was called")
let jobIdentifer = JobIdentifier<TestJobParameters>(#function)
try await self.testJobQueue(numWorkers: 1) { jobQueue in
jobQueue.registerJob(id: jobIdentifer) { parameters, _ in
jobQueue.registerJob(parameters: TestJobParameters.self) { parameters, _ in
XCTAssertEqual(parameters.id, 23)
XCTAssertEqual(parameters.message, "Hello!")
expectation.fulfill()
}
try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!"))
try await jobQueue.push(TestJobParameters(id: 23, message: "Hello!"))

await self.wait(for: [expectation], timeout: 5)
await fulfillment(of: [expectation], timeout: 5)
}
}

/// Test job is cancelled on shutdown
func testShutdownJob() async throws {
let jobIdentifer = JobIdentifier<Int>(#function)
struct TestParameters: JobParameters {
static let jobName = "testShutdownJob"
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)

try await self.testJobQueue(numWorkers: 4) { jobQueue in
jobQueue.registerJob(id: jobIdentifer) { _, _ in
jobQueue.registerJob(parameters: TestParameters.self) { _, _ in
expectation.fulfill()
try await Task.sleep(for: .milliseconds(1000))
}
try await jobQueue.push(id: jobIdentifer, parameters: 0)
await self.wait(for: [expectation], timeout: 5)
try await jobQueue.push(TestParameters())
await fulfillment(of: [expectation], timeout: 5)

let processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing)
XCTAssertEqual(processingJobs.count, 1)
Expand All @@ -344,19 +349,25 @@ final class JobsTests: XCTestCase {

/// test job fails to decode but queue continues to process
func testFailToDecode() async throws {
struct TestIntParameter: JobParameters {
static let jobName = "testFailToDecode"
let value: Int
}
struct TestStringParameter: JobParameters {
static let jobName = "testFailToDecode"
let value: String
}
let string: NIOLockedValueBox<String> = .init("")
let jobIdentifer1 = JobIdentifier<Int>(#function)
let jobIdentifer2 = JobIdentifier<String>(#function)
let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1)

try await self.testJobQueue(numWorkers: 4) { jobQueue in
jobQueue.registerJob(id: jobIdentifer2) { parameters, _ in
string.withLockedValue { $0 = parameters }
jobQueue.registerJob(parameters: TestStringParameter.self) { parameters, _ in
string.withLockedValue { $0 = parameters.value }
expectation.fulfill()
}
try await jobQueue.push(id: jobIdentifer1, parameters: 2)
try await jobQueue.push(id: jobIdentifer2, parameters: "test")
await self.wait(for: [expectation], timeout: 5)
try await jobQueue.push(TestIntParameter(value: 2))
try await jobQueue.push(TestStringParameter(value: "test"))
await fulfillment(of: [expectation], timeout: 5)
}
string.withLockedValue {
XCTAssertEqual($0, "test")
Expand All @@ -366,13 +377,15 @@ final class JobsTests: XCTestCase {
/// creates job that errors on first attempt, and is left on processing queue and
/// is then rerun on startup of new server
func testRerunAtStartup() async throws {
struct TestParameters: JobParameters {
static let jobName = "testRerunAtStartup"
}
struct RetryError: Error {}
let jobIdentifer = JobIdentifier<Int>(#function)
let firstTime = ManagedAtomic(true)
let finished = ManagedAtomic(false)
let failedExpectation = XCTestExpectation(description: "TestJob failed", expectedFulfillmentCount: 1)
let succeededExpectation = XCTestExpectation(description: "TestJob2 succeeded", expectedFulfillmentCount: 1)
let job = JobDefinition(id: jobIdentifer) { _, _ in
let job = JobDefinition(parameters: TestParameters.self) { _, _ in
if firstTime.compareExchange(expected: true, desired: false, ordering: .relaxed).original {
failedExpectation.fulfill()
throw RetryError()
Expand All @@ -388,9 +401,9 @@ final class JobsTests: XCTestCase {
jobQueue: jobQueue,
revertMigrations: true
) { jobQueue in
try await jobQueue.push(id: jobIdentifer, parameters: 0)
try await jobQueue.push(TestParameters())

await self.wait(for: [failedExpectation], timeout: 10)
await fulfillment(of: [failedExpectation], timeout: 10)

XCTAssertFalse(firstTime.load(ordering: .relaxed))
XCTAssertFalse(finished.load(ordering: .relaxed))
Expand All @@ -399,21 +412,24 @@ final class JobsTests: XCTestCase {
let jobQueue2 = try await createJobQueue(numWorkers: 1)
jobQueue2.registerJob(job)
try await self.testJobQueue(jobQueue: jobQueue2, failedJobsInitialization: .rerun) { _ in
await self.wait(for: [succeededExpectation], timeout: 10)
await fulfillment(of: [succeededExpectation], timeout: 10)
XCTAssertTrue(finished.load(ordering: .relaxed))
}
}

func testMultipleJobQueueHandlers() async throws {
let jobIdentifer = JobIdentifier<Int>(#function)
struct TestParameters: JobParameters {
static let jobName = "testMultipleJobQueueHandlers"
let value: Int
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200)
let logger = {
var logger = Logger(label: "testMultipleJobQueueHandlers")
logger.logLevel = .debug
return logger
}()
let job = JobDefinition(id: jobIdentifer) { parameters, context in
context.logger.info("Parameters=\(parameters)")
let job = JobDefinition(parameters: TestParameters.self) { parameters, context in
context.logger.info("Parameters=\(parameters.value)")
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
expectation.fulfill()
}
Expand Down Expand Up @@ -463,11 +479,11 @@ final class JobsTests: XCTestCase {
try await jobQueue.queue.cleanup(failedJobs: .remove, processingJobs: .remove)
try await jobQueue2.queue.cleanup(failedJobs: .remove, processingJobs: .remove)
do {
for i in 0..<100 {
try await jobQueue2.push(id: jobIdentifer, parameters: i)
try await jobQueue.push(id: jobIdentifer, parameters: i)
for i in 0..<200 {
try await jobQueue.push(TestParameters(value: i))
try await jobQueue2.push(TestParameters(value: i))
}
await self.wait(for: [expectation], timeout: 5)
await fulfillment(of: [expectation], timeout: 5)
await serviceGroup.triggerGracefulShutdown()
} catch {
XCTFail("\(String(reflecting: error))")
Expand Down
Loading