Skip to content

Commit d608d8d

Browse files
committed
Separate cancel job vs resumable job tests
1 parent f8bd252 commit d608d8d

File tree

2 files changed

+62
-15
lines changed

2 files changed

+62
-15
lines changed

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import PostgresNIO
4242
/// try await migrations.apply(client: postgresClient, logger: logger, dryRun: applyMigrations)
4343
/// }
4444
/// ```
45-
public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueueProtocol, ResumeableJobQueueProtocol {
45+
public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue {
4646

4747
public typealias JobID = UUID
4848
/// what to do with failed/processing jobs from last time queue was handled
@@ -174,7 +174,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueueProtocol
174174
self.logger = logger
175175
self.isStopped = .init(false)
176176
self.migrations = migrations
177-
await migrations.add(CreateSwiftJobsMigrations(), checkForDuplicates: true)
177+
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
178178
}
179179

180180
public func onInit() async throws {

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -683,13 +683,13 @@ final class JobsTests: XCTestCase {
683683
group.cancelAll()
684684
}
685685
}
686-
687-
func testJobActions() async throws {
686+
687+
func testResumableAndPausableJobs() async throws {
688688
struct TestParameters: JobParameters {
689-
static let jobName = "testJobActions"
689+
static let jobName = "testResumableAndPausableJobs"
690690
let value: Int
691691
}
692-
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3)
692+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
693693
let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([])
694694

695695
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
@@ -713,21 +713,70 @@ final class JobsTests: XCTestCase {
713713

714714
try await jobQueue.pauseJob(jobID: firstJob)
715715

716+
try await queue.push(
717+
TestParameters(value: 2025),
718+
options: .init(
719+
priority: .normal()
720+
)
721+
)
722+
723+
try await withThrowingTaskGroup(of: Void.self) { group in
724+
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
725+
726+
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
727+
XCTAssertEqual(processingJobs.count, 1)
728+
729+
group.addTask {
730+
try await serviceGroup.run()
731+
}
732+
733+
let pausedJobs = try await jobQueue.queue.getJobs(withStatus: .paused)
734+
XCTAssertEqual(pausedJobs.count, 1)
735+
736+
try await jobQueue.resumeJob(jobID: firstJob)
737+
738+
await fulfillment(of: [expectation], timeout: 10)
739+
await serviceGroup.triggerGracefulShutdown()
740+
}
741+
}
742+
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [2025, 20])
743+
}
744+
745+
func testCancellableJob() async throws {
746+
struct TestParameters: JobParameters {
747+
static let jobName = "testCancellableJob"
748+
let value: Int
749+
}
750+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)
751+
let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([])
752+
753+
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
754+
755+
try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
756+
queue.registerJob(parameters: TestParameters.self) { parameters, context in
757+
context.logger.info("Parameters=\(parameters.value)")
758+
jobExecutionSequence.withLockedValue {
759+
$0.append(parameters.value)
760+
}
761+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
762+
expectation.fulfill()
763+
}
764+
716765
try await queue.push(
717766
TestParameters(value: 2025),
718767
options: .init(
719768
priority: .highest()
720769
)
721770
)
722771

723-
let cancelID = try await queue.push(
772+
let cancellableJob = try await queue.push(
724773
TestParameters(value: 42),
725774
options: .init(
726775
priority: .lower()
727776
)
728777
)
729778

730-
try await jobQueue.cancelJob(jobID: cancelID)
779+
try await jobQueue.cancelJob(jobID: cancellableJob)
731780

732781
try await withThrowingTaskGroup(of: Void.self) { group in
733782
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
@@ -739,18 +788,16 @@ final class JobsTests: XCTestCase {
739788
try await serviceGroup.run()
740789
}
741790

742-
let pausedJobs = try await jobQueue.queue.getJobs(withStatus: .paused)
743-
XCTAssertEqual(pausedJobs.count, 1)
791+
await fulfillment(of: [expectation], timeout: 10)
792+
744793
let cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .cancelled)
745794
XCTAssertEqual(cancelledJobs.count, 1)
746795

747-
try await jobQueue.resumeJob(jobID: firstJob)
748-
try await jobQueue.resumeJob(jobID: cancelID)
749-
750-
await fulfillment(of: [expectation], timeout: 10)
751796
await serviceGroup.triggerGracefulShutdown()
797+
798+
try await jobQueue.queue.delete(jobID: cancellableJob)
752799
}
753800
}
754-
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [2025, 20, 42])
801+
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [2025])
755802
}
756803
}

0 commit comments

Comments
 (0)