Skip to content

Commit f8bd252

Browse files
committed
Implementing cancellable and resumable jobs
1 parent 2ed5755 commit f8bd252

File tree

3 files changed

+137
-4
lines changed

3 files changed

+137
-4
lines changed

Package.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ let package = Package(
1010
],
1111
dependencies: [
1212
// TODO: use a released version of swift-jobs
13-
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"),
14-
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"),
13+
.package(url: "https://github.com/thoven87/swift-jobs.git", branch: "resumable-cancellable-jobs"),
14+
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", branch: "main"),
1515
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"),
1616
],
1717
targets: [

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ import PostgresNIO
4242
/// try await migrations.apply(client: postgresClient, logger: logger, dryRun: applyMigrations)
4343
/// }
4444
/// ```
45-
public final class PostgresJobQueue: JobQueueDriver {
45+
public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueueProtocol, ResumeableJobQueueProtocol {
46+
4647
public typealias JobID = UUID
4748
/// what to do with failed/processing jobs from last time queue was handled
4849
public enum JobCleanup: Sendable {
@@ -131,6 +132,8 @@ public final class PostgresJobQueue: JobQueueDriver {
131132
case pending = 0
132133
case processing = 1
133134
case failed = 2
135+
case cancelled = 3
136+
case paused = 4
134137
}
135138

136139
/// Queue configuration
@@ -171,7 +174,7 @@ public final class PostgresJobQueue: JobQueueDriver {
171174
self.logger = logger
172175
self.isStopped = .init(false)
173176
self.migrations = migrations
174-
await migrations.add(CreateSwiftJobsMigrations())
177+
await migrations.add(CreateSwiftJobsMigrations(), checkForDuplicates: true)
175178
}
176179

177180
public func onInit() async throws {
@@ -180,6 +183,56 @@ public final class PostgresJobQueue: JobQueueDriver {
180183
try await self.migrations.waitUntilCompleted()
181184
}
182185

186+
/// Cancel job
187+
///
188+
/// This function is used to cancel a job. Job cancellation is not gaurenteed howerever.
189+
/// Cancellable jobs are jobs with a delayed greather than when the cancellation request was made
190+
///
191+
/// - Parameters:
192+
/// - jobID: an existing job
193+
/// - Throws:
194+
public func cancel(jobID: JobID) async throws {
195+
try await self.client.withTransaction(logger: logger) { connection in
196+
try await deleteFromQueue(jobID: jobID, connection: connection)
197+
try await setStatus(jobID: jobID, status: .cancelled, connection: connection)
198+
}
199+
}
200+
201+
/// Pause job
202+
///
203+
/// This function is used to pause a job. Job paus is not gaurenteed howerever.
204+
/// Pausable jobs are jobs with a delayed greather than when the pause request was made
205+
///
206+
/// - Parameters:
207+
/// - jobID: an existing job
208+
/// - Throws:
209+
public func pause(jobID: UUID) async throws {
210+
try await self.client.withTransaction(logger: logger) { connection in
211+
try await deleteFromQueue(jobID: jobID, connection: connection)
212+
try await setStatus(jobID: jobID, status: .paused, connection: connection)
213+
}
214+
}
215+
216+
/// Resume job
217+
///
218+
/// This function is used to resume jobs. Job is not gaurenteed howerever.
219+
/// Cancellable jobs are jobs with a delayed greather than when the cancellation request was made
220+
///
221+
/// - Parameters:
222+
/// - jobID: an existing job
223+
/// - Throws:
224+
public func resume(jobID: JobID) async throws {
225+
try await self.client.withTransaction(logger: logger) { connection in
226+
try await setStatus(jobID: jobID, status: .pending, connection: connection)
227+
try await addToQueue(
228+
jobID: jobID,
229+
queueName: configuration.queueName,
230+
options: .init(),
231+
connection: connection
232+
)
233+
}
234+
}
235+
183236
/// Cleanup job queues
184237
///
185238
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
@@ -431,6 +484,16 @@ public final class PostgresJobQueue: JobQueueDriver {
431484
logger: self.logger
432485
)
433486
}
487+
488+
func deleteFromQueue(jobID: JobID, connection: PostgresConnection) async throws {
489+
try await connection.query(
490+
"""
491+
DELETE FROM swift_jobs.queues
492+
WHERE job_id = \(jobID) AND queue_name = \(configuration.queueName)
493+
""",
494+
logger: self.logger
495+
)
496+
}
434497

435498
func addToQueue(jobID: JobID, queueName: String, options: JobOptions, connection: PostgresConnection) async throws {
436499
try await connection.query(

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,4 +683,74 @@ final class JobsTests: XCTestCase {
683683
group.cancelAll()
684684
}
685685
}
686+
687+
func testJobActions() async throws {
688+
struct TestParameters: JobParameters {
689+
static let jobName = "testJobActions"
690+
let value: Int
691+
}
692+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3)
693+
let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([])
694+
695+
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
696+
697+
try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
698+
queue.registerJob(parameters: TestParameters.self) { parameters, context in
699+
context.logger.info("Parameters=\(parameters.value)")
700+
jobExecutionSequence.withLockedValue {
701+
$0.append(parameters.value)
702+
}
703+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
704+
expectation.fulfill()
705+
}
706+
707+
let firstJob = try await queue.push(
708+
TestParameters(value: 20),
709+
options: .init(
710+
priority: .lowest()
711+
)
712+
)
713+
714+
try await jobQueue.pauseJob(jobID: firstJob)
715+
716+
try await queue.push(
717+
TestParameters(value: 2025),
718+
options: .init(
719+
priority: .highest()
720+
)
721+
)
722+
723+
let cancelID = try await queue.push(
724+
TestParameters(value: 42),
725+
options: .init(
726+
priority: .lower()
727+
)
728+
)
729+
730+
try await jobQueue.cancelJob(jobID: cancelID)
731+
732+
try await withThrowingTaskGroup(of: Void.self) { group in
733+
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
734+
735+
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
736+
XCTAssertEqual(processingJobs.count, 1)
737+
738+
group.addTask {
739+
try await serviceGroup.run()
740+
}
741+
742+
let pausedJobs = try await jobQueue.queue.getJobs(withStatus: .paused)
743+
XCTAssertEqual(pausedJobs.count, 1)
744+
let cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .cancelled)
745+
XCTAssertEqual(cancelledJobs.count, 1)
746+
747+
try await jobQueue.resumeJob(jobID: firstJob)
748+
try await jobQueue.resumeJob(jobID: cancelID)
749+
750+
await fulfillment(of: [expectation], timeout: 10)
751+
await serviceGroup.triggerGracefulShutdown()
752+
}
753+
}
754+
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [2025, 20, 42])
755+
}
686756
}

0 commit comments

Comments
 (0)