Skip to content

Commit 163d855

Browse files
committed
PR comments
1 parent b084875 commit 163d855

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,21 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
142142
let pollTime: Duration
143143
/// Which Queue to push jobs into
144144
let queueName: String
145+
/// What to do with cancelled jobs
146+
let deleteCancelledJobs: Bool
145147

146148
/// Initialize configuration
147149
/// - Parameters
148150
/// - pollTime: Queue poll time to wait if queue empties
149151
/// - queueName: Name of queue we are handing
150152
public init(
151153
pollTime: Duration = .milliseconds(100),
152-
queueName: String = "default"
154+
queueName: String = "default",
155+
deleteCancelledJobs: Bool = true
153156
) {
154157
self.pollTime = pollTime
155158
self.queueName = queueName
159+
self.deleteCancelledJobs = deleteCancelledJobs
156160
}
157161
}
158162

@@ -194,6 +198,10 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
194198
public func cancel(jobID: JobID) async throws {
195199
try await self.client.withTransaction(logger: logger) { connection in
196200
try await deleteFromQueue(jobID: jobID, connection: connection)
201+
if configuration.deleteCancelledJobs {
202+
try await delete(jobID: jobID)
203+
return
204+
}
197205
try await setStatus(jobID: jobID, status: .cancelled, connection: connection)
198206
}
199207
}

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -704,22 +704,22 @@ final class JobsTests: XCTestCase {
704704
expectation.fulfill()
705705
}
706706

707-
let firstJob = try await queue.push(
707+
let resumableJob = try await queue.push(
708708
TestParameters(value: 20),
709709
options: .init(
710710
priority: .lowest()
711711
)
712712
)
713713

714-
try await jobQueue.pauseJob(jobID: firstJob)
715-
716714
try await queue.push(
717715
TestParameters(value: 2025),
718716
options: .init(
719717
priority: .normal()
720718
)
721719
)
722720

721+
try await jobQueue.pauseJob(jobID: resumableJob)
722+
723723
try await withThrowingTaskGroup(of: Void.self) { group in
724724
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
725725

@@ -729,16 +729,21 @@ final class JobsTests: XCTestCase {
729729
group.addTask {
730730
try await serviceGroup.run()
731731
}
732-
732+
733+
let processingJobCount = try await jobQueue.queue.getJobs(withStatus: .processing)
734+
// Job 2 has been processed
735+
XCTAssertEqual(processingJobCount.count, 0)
736+
// Job 1 has not been processed
733737
let pausedJobs = try await jobQueue.queue.getJobs(withStatus: .paused)
734738
XCTAssertEqual(pausedJobs.count, 1)
735-
736-
try await jobQueue.resumeJob(jobID: firstJob)
739+
// resume job 1
740+
try await jobQueue.resumeJob(jobID: resumableJob)
737741

738742
await fulfillment(of: [expectation], timeout: 10)
739743
await serviceGroup.triggerGracefulShutdown()
740744
}
741745
}
746+
// verify job run order
742747
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [2025, 20])
743748
}
744749

0 commit comments

Comments
 (0)