Skip to content

Commit d2b80d4

Browse files
committed
Add tests, and new version of getJobs()
1 parent 7639421 commit d2b80d4

File tree

3 files changed

+51
-2
lines changed

3 files changed

+51
-2
lines changed

Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ extension PostgresJobQueue {
111111
/// wait for migrations to complete before running job queue cleanup
112112
try await self.migrations.waitUntilCompleted()
113113
_ = try await self.client.withTransaction(logger: logger) { connection in
114-
self.logger.info("Update Jobs")
114+
self.logger.info("Cleanup Jobs")
115115
try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection)
116116
try await self.updateJobsOnInit(withStatus: .processing, onInit: processingJobs, connection: connection)
117117
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
@@ -148,7 +148,7 @@ extension PostgresJobQueue {
148148
)
149149

150150
case .rerun:
151-
let jobs = try await getJobs(withStatus: status)
151+
let jobs = try await getJobs(withStatus: status, connection: connection)
152152
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
153153
for jobID in jobs {
154154
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection)

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,23 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
502502
return jobs
503503
}
504504

505+
func getJobs(withStatus status: Status, connection: PostgresConnection) async throws -> [JobID] {
506+
let stream = try await connection.query(
507+
"""
508+
SELECT
509+
id
510+
FROM swift_jobs.jobs
511+
WHERE status = \(status) AND queue_name = \(configuration.queueName)
512+
""",
513+
logger: self.logger
514+
)
515+
var jobs: [JobID] = []
516+
for try await id in stream.decode(JobID.self, context: .default) {
517+
jobs.append(id)
518+
}
519+
return jobs
520+
}
521+
505522
let jobRegistry: JobRegistry
506523
}
507524

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,8 @@ final class JobsTests: XCTestCase {
875875

876876
let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed)
877877
XCTAssertEqual(completedJobs.count, 3)
878+
try await jobQueue.queue.cleanup(completedJobs: .remove(maxAge: .seconds(10)))
879+
XCTAssertEqual(completedJobs.count, 3)
878880
try await jobQueue.queue.cleanup(completedJobs: .remove(maxAge: .seconds(0)))
879881
let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .completed)
880882
XCTAssertEqual(zeroJobs.count, 0)
@@ -912,6 +914,36 @@ final class JobsTests: XCTestCase {
912914
}
913915
}
914916

917+
func testCleanupProcessingJobs() async throws {
918+
let jobQueue = try await self.createJobQueue(numWorkers: 1)
919+
let jobName = JobName<Int>("testCancelledJobRetention")
920+
jobQueue.registerJob(name: jobName) { _, _ in }
921+
922+
try await withThrowingTaskGroup(of: Void.self) { group in
923+
group.addTask {
924+
// run postgres client
925+
await jobQueue.queue.client.run()
926+
}
927+
try await jobQueue.queue.migrations.apply(client: jobQueue.queue.client, logger: jobQueue.logger, dryRun: false)
928+
929+
let jobID = try await jobQueue.push(jobName, parameters: 1)
930+
let job = try await jobQueue.queue.popFirst()
931+
XCTAssertEqual(jobID, job?.id)
932+
_ = try await jobQueue.push(jobName, parameters: 1)
933+
_ = try await jobQueue.queue.popFirst()
934+
935+
var processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing)
936+
XCTAssertEqual(processingJobs.count, 2)
937+
938+
try await jobQueue.queue.cleanup(processingJobs: .remove)
939+
940+
processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing)
941+
XCTAssertEqual(processingJobs.count, 0)
942+
943+
group.cancelAll()
944+
}
945+
}
946+
915947
func testCleanupJob() async throws {
916948
try await self.testJobQueue(
917949
numWorkers: 1,

0 commit comments

Comments
 (0)