Skip to content

Commit b6497ef

Browse files
committed
Create separate cleanup job for each queue
1 parent f482e19 commit b6497ef

File tree

6 files changed

+108
-49
lines changed

6 files changed

+108
-49
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ let package = Package(
1010
],
1111
dependencies: [
1212
// TODO: use a released version of swift-jobs
13-
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"),
13+
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "jobschedule-addjob"),
1414
.package(url: "https://github.com/hummingbird-project/postgres-migrations.git", from: "0.1.0"),
1515
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"),
1616
],

Sources/JobsPostgres/JobCleanupParameters.swift

Lines changed: 0 additions & 33 deletions
This file was deleted.

Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,48 @@ import NIOCore
2020
import PostgresMigrations
2121
import PostgresNIO
2222

23+
/// Parameters for Cleanup job
24+
public struct JobCleanupParameters: Sendable & Codable {
25+
let failedJobs: PostgresJobQueue.JobCleanup
26+
let completedJobs: PostgresJobQueue.JobCleanup
27+
let cancelledJobs: PostgresJobQueue.JobCleanup
28+
29+
public init(
30+
failedJobs: PostgresJobQueue.JobCleanup = .doNothing,
31+
completedJobs: PostgresJobQueue.JobCleanup = .doNothing,
32+
cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing
33+
) {
34+
self.failedJobs = failedJobs
35+
self.completedJobs = completedJobs
36+
self.cancelledJobs = cancelledJobs
37+
}
38+
}
39+
2340
extension PostgresJobQueue {
41+
/// clean up job name.
42+
///
43+
/// Use this with the ``JobSchedule`` to schedule a cleanup of
44+
/// failed, cancelled or completed jobs
45+
public var cleanupJob: JobName<JobCleanupParameters> {
46+
.init("_JobCleanup_\(self.configuration.queueName)")
47+
}
48+
49+
/// register clean up job on queue
50+
func registerCleanupJob() {
51+
self.registerJob(
52+
JobDefinition(name: cleanupJob, parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in
53+
try await self.cleanup(
54+
failedJobs: parameters.failedJobs,
55+
processingJobs: .doNothing,
56+
pendingJobs: .doNothing,
57+
completedJobs: parameters.completedJobs,
58+
cancelledJobs: parameters.cancelledJobs,
59+
logger: self.logger
60+
)
61+
}
62+
)
63+
}
64+
2465
/// Cleanup job queues
2566
///
2667
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
@@ -80,7 +121,7 @@ extension PostgresJobQueue {
80121
if let olderThan {
81122
.now - Double(olderThan.components.seconds)
82123
} else {
83-
.distantPast
124+
.distantFuture
84125
}
85126
try await connection.query(
86127
"""

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
165165
public let configuration: Configuration
166166
/// Logger used by queue
167167
public let logger: Logger
168-
169168
let migrations: DatabaseMigrations
170169
let isStopped: NIOLockedValueBox<Bool>
171170

@@ -178,18 +177,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
178177
self.isStopped = .init(false)
179178
self.migrations = migrations
180179
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
181-
self.registerJob(
182-
JobDefinition(parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in
183-
try await self.cleanup(
184-
failedJobs: parameters.failedJobs,
185-
processingJobs: .doNothing,
186-
pendingJobs: .doNothing,
187-
completedJobs: parameters.completedJobs,
188-
cancelledJobs: parameters.cancelledJobs,
189-
logger: logger
190-
)
191-
}
192-
)
180+
self.registerCleanupJob()
193181
}
194182

195183
public func onInit() async throws {

Sources/JobsPostgres/RetentionPolicy.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public struct RetentionPolicy: Sendable {
4242

4343
public init(
4444
cancelled: RetainData = .retain,
45-
completed: RetainData = .retain,
45+
completed: RetainData = .never,
4646
failed: RetainData = .retain
4747
) {
4848
self.cancelled = cancelled

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,4 +886,67 @@ final class JobsTests: XCTestCase {
886886
XCTAssertEqual(zeroJobs.count, 0)
887887
}
888888
}
889+
890+
func testCleanupJob() async throws {
891+
try await self.testJobQueue(
892+
numWorkers: 1,
893+
configuration: .init(
894+
retentionPolicy: .init(
895+
cancelled: .retain,
896+
completed: .never,
897+
failed: .retain
898+
)
899+
)
900+
) { jobQueue in
901+
try await self.testJobQueue(
902+
numWorkers: 1,
903+
configuration: .init(
904+
queueName: "SecondQueue",
905+
retentionPolicy: .init(
906+
cancelled: .retain,
907+
completed: .never,
908+
failed: .retain
909+
)
910+
)
911+
) { jobQueue2 in
912+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
913+
var iterator = stream.makeAsyncIterator()
914+
struct TempJob: Sendable & Codable {}
915+
let barrierJobName = JobName<TempJob>("barrier")
916+
jobQueue.registerJob(name: "testCleanupJob", parameters: String.self) { parameters, context in
917+
throw CancellationError()
918+
}
919+
jobQueue.registerJob(name: barrierJobName, parameters: TempJob.self) { parameters, context in
920+
cont.yield()
921+
}
922+
jobQueue2.registerJob(name: "testCleanupJob", parameters: String.self) { parameters, context in
923+
throw CancellationError()
924+
}
925+
jobQueue2.registerJob(name: barrierJobName, parameters: TempJob.self) { parameters, context in
926+
cont.yield()
927+
}
928+
try await jobQueue.push("testCleanupJob", parameters: "1")
929+
try await jobQueue.push("testCleanupJob", parameters: "2")
930+
try await jobQueue.push("testCleanupJob", parameters: "3")
931+
try await jobQueue.push(barrierJobName, parameters: .init())
932+
try await jobQueue2.push("testCleanupJob", parameters: "1")
933+
try await jobQueue2.push(barrierJobName, parameters: .init())
934+
935+
await iterator.next()
936+
await iterator.next()
937+
938+
let failedJob = try await jobQueue.queue.getJobs(withStatus: .failed)
939+
XCTAssertEqual(failedJob.count, 3)
940+
try await jobQueue.push(jobQueue.queue.cleanupJob, parameters: .init(failedJobs: .remove))
941+
try await jobQueue.push(barrierJobName, parameters: .init())
942+
943+
await iterator.next()
944+
945+
let zeroJobs = try await jobQueue.queue.getJobs(withStatus: .failed)
946+
XCTAssertEqual(zeroJobs.count, 0)
947+
let jobCount2 = try await jobQueue2.queue.getJobs(withStatus: .failed)
948+
XCTAssertEqual(jobCount2.count, 1)
949+
}
950+
}
951+
}
889952
}

0 commit comments

Comments
 (0)