Skip to content

Commit eddd61e

Browse files
committed
Adding Job Actions which allows for cancelling, pausing and resuming jobs
1 parent 67e50cf commit eddd61e

File tree

3 files changed

+122
-9
lines changed

3 files changed

+122
-9
lines changed

Package.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ let package = Package(
1010
],
1111
dependencies: [
1212
// TODO: use a released version of swift-jobs
13-
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"),
14-
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"),
13+
.package(url: "https://github.com/thoven87/swift-jobs.git", branch: "job-actions"),
14+
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", branch: "main"),
1515
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"),
1616
],
1717
targets: [

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ public final class PostgresJobQueue: JobQueueDriver {
131131
case pending = 0
132132
case processing = 1
133133
case failed = 2
134+
case paused = 3
135+
case cancelled = 4
134136
}
135137

136138
/// Queue configuration
@@ -247,22 +249,51 @@ public final class PostgresJobQueue: JobQueueDriver {
247249

248250
/// Retry an existing Job
249251
/// - Parameters
250-
/// - id: Job instance ID
252+
/// - jobID: Job instance ID
251253
/// - jobRequest: Job Request
252254
/// - options: Job retry options
253-
public func retry<Parameters: JobParameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
255+
public func retry<Parameters: JobParameters>(_ jobID: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
254256
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
255257
try await self.client.withTransaction(logger: self.logger) { connection in
256-
try await self.updateJob(id: id, buffer: buffer, connection: connection)
258+
try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection)
257259
try await self.addToQueue(
258-
jobID: id,
260+
jobID: jobID,
259261
queueName: configuration.queueName,
260262
options: .init(delayUntil: options.delayUntil),
261263
connection: connection
262264
)
263265
}
264266
}
265267

268+
/// Perform actions on job
269+
/// - Parameters
270+
/// - jobID: Job instance ID
271+
/// - action Job Action
272+
public func performAction(jobID: JobID, action: JobAction) async throws {
273+
switch action {
274+
case .cancel():
275+
try await self.performJobAction(jobID: jobID, status: .cancelled)
276+
case .pause():
277+
try await self.performJobAction(jobID: jobID, status: .paused)
278+
case .resume():
279+
try await self.client.withTransaction(logger: logger) { connection in
280+
try await self.setStatus(jobID: jobID, status: .pending, connection: connection)
281+
try await self.addToQueue(
282+
jobID: jobID,
283+
queueName: configuration.queueName,
284+
options: .init(),
285+
connection: connection
286+
)
287+
}
288+
default:
289+
break
290+
}
291+
}
292+
293+
public func isEmpty() async throws -> Bool {
294+
true
295+
}
296+
266297
/// This is called to say job has finished processing and it can be deleted
267298
public func finished(jobID: JobID) async throws {
268299
try await self.delete(jobID: jobID)
@@ -408,15 +439,15 @@ public final class PostgresJobQueue: JobQueueDriver {
408439
logger: self.logger
409440
)
410441
}
411-
// TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
412-
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
442+
443+
func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
413444
try await connection.query(
414445
"""
415446
UPDATE swift_jobs.jobs
416447
SET job = \(buffer),
417448
last_modified = \(Date.now),
418449
status = \(Status.failed)
419-
WHERE id = \(id) AND queue_name = \(configuration.queueName)
450+
WHERE id = \(jobID) AND queue_name = \(configuration.queueName)
420451
""",
421452
logger: self.logger
422453
)
@@ -485,6 +516,20 @@ public final class PostgresJobQueue: JobQueueDriver {
485516
return jobs
486517
}
487518

519+
func performJobAction(jobID: JobID, status: Status) async throws {
520+
try await self.client.withTransaction(logger: logger) { connection in
521+
522+
try await connection.query(
523+
"""
524+
DELETE FROM swift_jobs.queues
525+
WHERE job_id = \(jobID) AND queue_name = \(configuration.queueName)
526+
""",
527+
logger: self.logger
528+
)
529+
try await self.setStatus(jobID: jobID, status: status, connection: connection)
530+
}
531+
}
532+
488533
func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws {
489534
switch onInit {
490535
case .remove:

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,4 +683,72 @@ final class JobsTests: XCTestCase {
683683
group.cancelAll()
684684
}
685685
}
686+
687+
func testJobActions() async throws {
688+
struct TestParameters: JobParameters {
689+
static let jobName = "testJobActions"
690+
let value: Int
691+
}
692+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3)
693+
let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([])
694+
695+
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
696+
697+
try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
698+
queue.registerJob(parameters: TestParameters.self) { parameters, context in
699+
context.logger.info("Parameters=\(parameters.value)")
700+
jobExecutionSequence.withLockedValue {
701+
$0.append(parameters.value)
702+
}
703+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
704+
expectation.fulfill()
705+
}
706+
707+
let firstJob = try await queue.push(
708+
TestParameters(value: 20),
709+
options: .init(
710+
priority: .lowest()
711+
)
712+
)
713+
714+
try await jobQueue.performAction(jobID: firstJob, action: .pause())
715+
716+
try await queue.push(
717+
TestParameters(value: 2025),
718+
options: .init(
719+
priority: .highest()
720+
)
721+
)
722+
723+
let cancelID = try await queue.push(
724+
TestParameters(value: 42),
725+
options: .init(
726+
priority: .lower()
727+
)
728+
)
729+
730+
try await jobQueue.performAction(jobID: cancelID, action: .cancel())
731+
732+
try await withThrowingTaskGroup(of: Void.self) { group in
733+
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
734+
735+
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
736+
XCTAssertEqual(processingJobs.count, 1)
737+
738+
group.addTask {
739+
try await serviceGroup.run()
740+
}
741+
742+
let pausedJobs = try await jobQueue.queue.getJobs(withStatus: .paused)
743+
XCTAssertEqual(pausedJobs.count, 1)
744+
let cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .cancelled)
745+
XCTAssertEqual(cancelledJobs.count, 1)
746+
try await jobQueue.performAction(jobID: firstJob, action: .resume())
747+
try await jobQueue.performAction(jobID: cancelID, action: .resume())
748+
await fulfillment(of: [expectation], timeout: 10)
749+
await serviceGroup.triggerGracefulShutdown()
750+
}
751+
}
752+
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [2025, 20, 42])
753+
}
686754
}

0 commit comments

Comments
 (0)