Skip to content

Commit 81119b9

Browse files
authored
Resumable and cancellable jobs (#26)
* Use uppercase for column types * Implementing cancellable and resumable jobs * Separate cancel job vs resumable job tests * swift format * Use main branch and updated tests * Used versioned released for hummingbird-postgres * PR comments * Swift format * Fix up cancellable job test * PR comments * Update resumable job tests
1 parent cc39800 commit 81119b9

File tree

4 files changed

+218
-11
lines changed

4 files changed

+218
-11
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ let package = Package(
1111
dependencies: [
1212
// TODO: use a released version of swift-jobs
1313
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"),
14-
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"),
14+
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.6.0"),
1515
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"),
1616
],
1717
targets: [

Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
2525
try await connection.query(
2626
"""
2727
CREATE TABLE IF NOT EXISTS swift_jobs.jobs (
28-
id uuid PRIMARY KEY,
29-
job bytea NOT NULL,
28+
id UUID PRIMARY KEY,
29+
job BYTEA NOT NULL,
3030
last_modified TIMESTAMPTZ NOT NULL DEFAULT now(),
3131
queue_name TEXT NOT NULL DEFAULT 'default',
32-
status smallint NOT NULL
32+
status SMALLINT NOT NULL
3333
);
3434
""",
3535
logger: logger
@@ -38,7 +38,7 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
3838
try await connection.query(
3939
"""
4040
CREATE TABLE IF NOT EXISTS swift_jobs.queues(
41-
job_id uuid PRIMARY KEY,
41+
job_id UUID PRIMARY KEY,
4242
created_at TIMESTAMPTZ NOT NULL,
4343
delayed_until TIMESTAMPTZ NOT NULL DEFAULT now(),
4444
queue_name TEXT NOT NULL DEFAULT 'default',
@@ -50,7 +50,7 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
5050

5151
try await connection.query(
5252
"""
53-
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_priority_queue_name_idx
53+
CREATE INDEX IF NOT EXISTS queues_delayed_until_priority_queue_name_idx
5454
ON swift_jobs.queues(priority DESC, delayed_until ASC, queue_name ASC)
5555
""",
5656
logger: logger
@@ -59,8 +59,8 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
5959
try await connection.query(
6060
"""
6161
CREATE TABLE IF NOT EXISTS swift_jobs.queues_metadata(
62-
key text PRIMARY KEY,
63-
value bytea NOT NULL,
62+
key TEXT PRIMARY KEY,
63+
value BYTEA NOT NULL,
6464
queue_name TEXT NOT NULL DEFAULT 'default'
6565
)
6666
""",
@@ -69,7 +69,7 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
6969

7070
try await connection.query(
7171
"""
72-
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_metadata_key_queue_name_idx
72+
CREATE INDEX IF NOT EXISTS queues_metadata_key_queue_name_idx
7373
ON swift_jobs.queues_metadata(key, queue_name)
7474
""",
7575
logger: logger

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ import PostgresNIO
4242
/// try await migrations.apply(client: postgresClient, logger: logger, dryRun: applyMigrations)
4343
/// }
4444
/// ```
45-
public final class PostgresJobQueue: JobQueueDriver {
45+
public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue {
46+
4647
public typealias JobID = UUID
4748
/// what to do with failed/processing jobs from last time queue was handled
4849
public enum JobCleanup: Sendable {
@@ -131,6 +132,8 @@ public final class PostgresJobQueue: JobQueueDriver {
131132
case pending = 0
132133
case processing = 1
133134
case failed = 2
135+
case cancelled = 3
136+
case paused = 4
134137
}
135138

136139
/// Queue configuration
@@ -171,7 +174,7 @@ public final class PostgresJobQueue: JobQueueDriver {
171174
self.logger = logger
172175
self.isStopped = .init(false)
173176
self.migrations = migrations
174-
await migrations.add(CreateSwiftJobsMigrations())
177+
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
175178
}
176179

177180
public func onInit() async throws {
@@ -180,6 +183,56 @@ public final class PostgresJobQueue: JobQueueDriver {
180183
try await self.migrations.waitUntilCompleted()
181184
}
182185

186+
/// Cancel job
187+
///
188+
/// This function is used to cancel a job. Job cancellation is not gaurenteed howerever.
189+
/// Cancellable jobs are jobs with a delayed greather than when the cancellation request was made
190+
///
191+
/// - Parameters:
192+
/// - jobID: an existing job
193+
/// - Throws:
194+
public func cancel(jobID: JobID) async throws {
195+
try await self.client.withTransaction(logger: logger) { connection in
196+
try await deleteFromQueue(jobID: jobID, connection: connection)
197+
try await delete(jobID: jobID)
198+
}
199+
}
200+
201+
/// Pause job
202+
///
203+
/// This function is used to pause a job. Job paus is not gaurenteed howerever.
204+
/// Pausable jobs are jobs with a delayed greather than when the pause request was made
205+
///
206+
/// - Parameters:
207+
/// - jobID: an existing job
208+
/// - Throws:
209+
public func pause(jobID: UUID) async throws {
210+
try await self.client.withTransaction(logger: logger) { connection in
211+
try await deleteFromQueue(jobID: jobID, connection: connection)
212+
try await setStatus(jobID: jobID, status: .paused, connection: connection)
213+
}
214+
}
215+
216+
/// Resume job
217+
///
218+
/// This function is used to resume jobs. Job is not gaurenteed howerever.
219+
/// Cancellable jobs are jobs with a delayed greather than when the cancellation request was made
220+
///
221+
/// - Parameters:
222+
/// - jobID: an existing job
223+
/// - Throws:
224+
public func resume(jobID: JobID) async throws {
225+
try await self.client.withTransaction(logger: logger) { connection in
226+
try await setStatus(jobID: jobID, status: .pending, connection: connection)
227+
try await addToQueue(
228+
jobID: jobID,
229+
queueName: configuration.queueName,
230+
options: .init(),
231+
connection: connection
232+
)
233+
}
234+
}
235+
183236
/// Cleanup job queues
184237
///
185238
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
@@ -432,6 +485,16 @@ public final class PostgresJobQueue: JobQueueDriver {
432485
)
433486
}
434487

488+
func deleteFromQueue(jobID: JobID, connection: PostgresConnection) async throws {
489+
try await connection.query(
490+
"""
491+
DELETE FROM swift_jobs.queues
492+
WHERE job_id = \(jobID) AND queue_name = \(configuration.queueName)
493+
""",
494+
logger: self.logger
495+
)
496+
}
497+
435498
func addToQueue(jobID: JobID, queueName: String, options: JobOptions, connection: PostgresConnection) async throws {
436499
try await connection.query(
437500
"""

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,4 +683,148 @@ final class JobsTests: XCTestCase {
683683
group.cancelAll()
684684
}
685685
}
686+
687+
func testResumableAndPausableJobs() async throws {
688+
struct TestParameters: JobParameters {
689+
static let jobName = "TestJob"
690+
}
691+
struct ResumableJob: JobParameters {
692+
static let jobName = "ResumanableJob"
693+
}
694+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
695+
let didResumableJobRun: NIOLockedValueBox<Bool> = .init(false)
696+
let didTestJobRun: NIOLockedValueBox<Bool> = .init(false)
697+
698+
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
699+
700+
try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
701+
queue.registerJob(parameters: TestParameters.self) { parameters, _ in
702+
didTestJobRun.withLockedValue {
703+
$0 = true
704+
}
705+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
706+
expectation.fulfill()
707+
}
708+
709+
queue.registerJob(parameters: ResumableJob.self) { parameters, _ in
710+
didResumableJobRun.withLockedValue {
711+
$0 = true
712+
}
713+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
714+
expectation.fulfill()
715+
}
716+
717+
let resumableJob = try await queue.push(
718+
ResumableJob(),
719+
options: .init(
720+
priority: .lowest()
721+
)
722+
)
723+
724+
try await queue.push(
725+
TestParameters(),
726+
options: .init(
727+
priority: .normal()
728+
)
729+
)
730+
731+
try await jobQueue.pauseJob(jobID: resumableJob)
732+
733+
try await withThrowingTaskGroup(of: Void.self) { group in
734+
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
735+
736+
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
737+
XCTAssertEqual(processingJobs.count, 1)
738+
739+
group.addTask {
740+
try await serviceGroup.run()
741+
}
742+
743+
let processingJobCount = try await jobQueue.queue.getJobs(withStatus: .processing)
744+
XCTAssertEqual(processingJobCount.count, 0)
745+
746+
let pausedJobs = try await jobQueue.queue.getJobs(withStatus: .paused)
747+
XCTAssertEqual(pausedJobs.count, 1)
748+
749+
try await jobQueue.resumeJob(jobID: resumableJob)
750+
751+
await fulfillment(of: [expectation], timeout: 10)
752+
await serviceGroup.triggerGracefulShutdown()
753+
}
754+
}
755+
XCTAssertEqual(didTestJobRun.withLockedValue { $0 }, true)
756+
XCTAssertEqual(didResumableJobRun.withLockedValue { $0 }, true)
757+
}
758+
759+
func testCancellableJob() async throws {
760+
struct TestParameters: JobParameters {
761+
static let jobName = "testCancellableJob"
762+
let value: Int
763+
}
764+
struct NoneCancelledJobParameters: JobParameters {
765+
static let jobName = "NoneCancelledJob"
766+
let value: Int
767+
}
768+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)
769+
let didRunCancelledJob: NIOLockedValueBox<Bool> = .init(false)
770+
let didRunNoneCancelledJob: NIOLockedValueBox<Bool> = .init(false)
771+
772+
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
773+
774+
try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
775+
queue.registerJob(parameters: TestParameters.self) { parameters, context in
776+
context.logger.info("Parameters=\(parameters.value)")
777+
didRunCancelledJob.withLockedValue {
778+
$0 = true
779+
}
780+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
781+
expectation.fulfill()
782+
}
783+
784+
queue.registerJob(parameters: NoneCancelledJobParameters.self) { parameters, context in
785+
context.logger.info("Parameters=\(parameters.value)")
786+
didRunNoneCancelledJob.withLockedValue {
787+
$0 = true
788+
}
789+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
790+
expectation.fulfill()
791+
}
792+
793+
let cancellableJob = try await queue.push(
794+
TestParameters(value: 42),
795+
options: .init(
796+
priority: .lower()
797+
)
798+
)
799+
800+
try await queue.push(
801+
NoneCancelledJobParameters(value: 2025),
802+
options: .init(
803+
priority: .highest()
804+
)
805+
)
806+
807+
try await jobQueue.cancelJob(jobID: cancellableJob)
808+
809+
try await withThrowingTaskGroup(of: Void.self) { group in
810+
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
811+
812+
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
813+
XCTAssertEqual(processingJobs.count, 1)
814+
815+
group.addTask {
816+
try await serviceGroup.run()
817+
}
818+
819+
await fulfillment(of: [expectation], timeout: 10)
820+
// Jobs has been removed
821+
let cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .cancelled)
822+
XCTAssertEqual(cancelledJobs.count, 0)
823+
824+
await serviceGroup.triggerGracefulShutdown()
825+
}
826+
}
827+
XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false)
828+
XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true)
829+
}
686830
}

0 commit comments

Comments
 (0)