Skip to content

Commit 5c22e68

Browse files
committed
Merge retention policy and cleanup
1 parent de0754e commit 5c22e68

File tree

6 files changed

+176
-164
lines changed

6 files changed

+176
-164
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Hummingbird server framework project
4+
//
5+
// Copyright (c) 2025 the Hummingbird authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Jobs
16+
17+
public struct JobCleanupParameters: JobParameters {
18+
static public var jobName: String { "_JobCleanup_" }
19+
20+
let jobQueueName: String
21+
let failedJobs: PostgresJobQueue.JobCleanup
22+
let completedJobs: PostgresJobQueue.JobCleanup
23+
let cancelledJobs: PostgresJobQueue.JobCleanup
24+
25+
public init(
26+
jobQueueName: String,
27+
failedJobs: PostgresJobQueue.JobCleanup = .doNothing,
28+
completedJobs: PostgresJobQueue.JobCleanup = .doNothing,
29+
cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing
30+
) {
31+
self.jobQueueName = jobQueueName
32+
self.failedJobs = failedJobs
33+
self.completedJobs = completedJobs
34+
self.cancelledJobs = cancelledJobs
35+
}
36+
}

Sources/JobsPostgres/JobPruner.swift

Lines changed: 0 additions & 19 deletions
This file was deleted.
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Hummingbird server framework project
4+
//
5+
// Copyright (c) 2024-2025 the Hummingbird authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Foundation
16+
import Jobs
17+
import Logging
18+
import NIOConcurrencyHelpers
19+
import NIOCore
20+
import PostgresMigrations
21+
import PostgresNIO
22+
23+
extension PostgresJobQueue {
24+
/// Cleanup job queues
25+
///
26+
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
27+
/// pushed back into the pending queue to be re-run or removed. When called at startup in
28+
/// theory no job should be set to processing, or set to pending but not in the queue. but if
29+
/// your job server crashes these states are possible, so we also provide options to re-queue
30+
/// these jobs so they are run again.
31+
///
32+
/// The job queue needs to be running when you call cleanup. You can call `cleanup` with
33+
/// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing
34+
/// or pending jobs should only be done if you are certain there is nothing else processing
35+
/// the job queue.
36+
///
37+
/// - Parameters:
38+
/// - failedJobs: What to do with jobs tagged as failed
39+
/// - processingJobs: What to do with jobs tagged as processing
40+
/// - pendingJobs: What to do with jobs tagged as pending
41+
/// - completedJobs: What to do with jobs tagged as completed
42+
/// - cancelledJobs: What to do with jobs tagged as cancelled
43+
/// - logger: Optional logger to use when performing cleanup
44+
/// - Throws:
45+
public func cleanup(
46+
failedJobs: JobCleanup = .doNothing,
47+
processingJobs: JobCleanup = .doNothing,
48+
pendingJobs: JobCleanup = .doNothing,
49+
completedJobs: JobCleanup = .doNothing,
50+
cancelledJobs: JobCleanup = .doNothing,
51+
logger: Logger? = nil
52+
) async throws {
53+
let logger = logger ?? self.logger
54+
do {
55+
/// wait for migrations to complete before running job queue cleanup
56+
try await self.migrations.waitUntilCompleted()
57+
_ = try await self.client.withTransaction(logger: logger) { connection in
58+
self.logger.info("Update Jobs")
59+
try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection)
60+
try await self.updateJobsOnInit(withStatus: .processing, onInit: processingJobs, connection: connection)
61+
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
62+
try await self.updateJobsOnInit(withStatus: .completed, onInit: completedJobs, connection: connection)
63+
try await self.updateJobsOnInit(withStatus: .cancelled, onInit: cancelledJobs, connection: connection)
64+
}
65+
} catch let error as PSQLError {
66+
logger.error(
67+
"JobQueue cleanup failed",
68+
metadata: [
69+
"Error": "\(String(reflecting: error))"
70+
]
71+
)
72+
throw error
73+
}
74+
}
75+
76+
func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws {
77+
switch onInit.rawValue {
78+
case .remove(let olderThan):
79+
let date: Date =
80+
if let olderThan {
81+
.now - Double(olderThan.components.seconds)
82+
} else {
83+
.distantPast
84+
}
85+
try await connection.query(
86+
"""
87+
DELETE FROM swift_jobs.jobs
88+
WHERE status = \(status) AND queue_name = \(configuration.queueName)
89+
AND last_modified < \(date)
90+
""",
91+
logger: self.logger
92+
)
93+
94+
case .rerun:
95+
let jobs = try await getJobs(withStatus: status)
96+
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
97+
for jobID in jobs {
98+
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection)
99+
}
100+
101+
case .doNothing:
102+
break
103+
}
104+
}
105+
}

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 24 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,18 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
4646

4747
public typealias JobID = UUID
4848
/// what to do with failed/processing jobs from last time queue was handled
49-
public enum JobCleanup: Sendable {
50-
case doNothing
51-
case rerun
52-
case remove
49+
public struct JobCleanup: Sendable, Codable {
50+
enum RawValue: Codable {
51+
case doNothing
52+
case rerun
53+
case remove(maxAge: Duration?)
54+
}
55+
let rawValue: RawValue
56+
57+
public static var doNothing: Self { .init(rawValue: .doNothing) }
58+
public static var rerun: Self { .init(rawValue: .rerun) }
59+
public static var remove: Self { .init(rawValue: .remove(maxAge: nil)) }
60+
public static func remove(maxAge: Duration) -> Self { .init(rawValue: .remove(maxAge: maxAge)) }
5361
}
5462

5563
/// Job priority from lowest to highest
@@ -170,6 +178,18 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
170178
self.isStopped = .init(false)
171179
self.migrations = migrations
172180
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
181+
self.registerJob(
182+
JobDefinition(parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in
183+
parameters.jobQueue.cleanup(
184+
failedJobs: parameters.failedJobs,
185+
processingJobs: .doNothing,
186+
pendingJobs: .doNothing,
187+
completedJobs: parameters.completedJobs,
188+
cancelledJobs: parameters.cancelledJobs,
189+
logger: logger
190+
)
191+
}
192+
)
173193
}
174194

175195
public func onInit() async throws {
@@ -232,53 +252,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
232252
}
233253
}
234254

235-
/// Cleanup job queues
236-
///
237-
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
238-
/// pushed back into the pending queue to be re-run or removed. When called at startup in
239-
/// theory no job should be set to processing, or set to pending but not in the queue. but if
240-
/// your job server crashes these states are possible, so we also provide options to re-queue
241-
/// these jobs so they are run again.
242-
///
243-
/// The job queue needs to be running when you call cleanup. You can call `cleanup` with
244-
/// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing
245-
/// or pending jobs should only be done if you are certain there is nothing else processing
246-
/// the job queue.
247-
///
248-
/// - Parameters:
249-
/// - failedJobs: What to do with jobs tagged as failed
250-
/// - processingJobs: What to do with jobs tagged as processing
251-
/// - pendingJobs: What to do with jobs tagged as pending
252-
/// - Throws:
253-
public func cleanup(
254-
failedJobs: JobCleanup = .doNothing,
255-
processingJobs: JobCleanup = .doNothing,
256-
pendingJobs: JobCleanup = .doNothing
257-
) async throws {
258-
do {
259-
/// wait for migrations to complete before running job queue cleanup
260-
try await self.migrations.waitUntilCompleted()
261-
_ = try await self.client.withConnection { connection in
262-
self.logger.info("Update Jobs")
263-
try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection)
264-
try await self.updateJobsOnInit(
265-
withStatus: .processing,
266-
onInit: processingJobs,
267-
connection: connection
268-
)
269-
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
270-
}
271-
} catch let error as PSQLError {
272-
logger.error(
273-
"JobQueue initialization failed",
274-
metadata: [
275-
"Error": "\(String(reflecting: error))"
276-
]
277-
)
278-
throw error
279-
}
280-
}
281-
282255
/// Register job
283256
/// - Parameters:
284257
/// - job: Job Definition
@@ -469,51 +442,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
469442
)
470443
}
471444

472-
/// This menthod shall be called with the JobPruner after registration as follow
473-
/// someJobQueue.registerJobparameters: JobPruner.self) { _, _ in
474-
/// try await someJobQueue.processDataRetentionPolicy()
475-
/// }
476-
/// let jobScheddule = JobSchedule([ .init(job: JobPruner(), schedule: .everyHour()) ])
477-
public func processDataRetentionPolicy() async throws {
478-
try await self.client.withTransaction(logger: logger) { tx in
479-
let now = Date.now.timeIntervalSince1970
480-
let retentionPolicy: RetentionPolicy = configuration.retentionPolicy
481-
482-
if case let .retain(timeAmout) = retentionPolicy.cancelled.rawValue {
483-
try await tx.query(
484-
"""
485-
DELETE FROM swift_jobs.jobs
486-
WHERE status = \(Status.cancelled)
487-
AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now)
488-
""",
489-
logger: self.logger
490-
)
491-
}
492-
493-
if case let .retain(timeAmout) = retentionPolicy.completed.rawValue {
494-
try await tx.query(
495-
"""
496-
DELETE FROM swift_jobs.jobs
497-
WHERE status = \(Status.completed)
498-
AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now)
499-
""",
500-
logger: self.logger
501-
)
502-
}
503-
504-
if case let .retain(timeAmout) = retentionPolicy.failed.rawValue {
505-
try await tx.query(
506-
"""
507-
DELETE FROM swift_jobs.jobs
508-
WHERE status = \(Status.failed)
509-
AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now)
510-
""",
511-
logger: self.logger
512-
)
513-
}
514-
}
515-
}
516-
517445
func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
518446
try await connection.query(
519447
"""
@@ -600,29 +528,6 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
600528
return jobs
601529
}
602530

603-
func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws {
604-
switch onInit {
605-
case .remove:
606-
try await connection.query(
607-
"""
608-
DELETE FROM swift_jobs.jobs
609-
WHERE status = \(status) AND queue_name = \(configuration.queueName)
610-
""",
611-
logger: self.logger
612-
)
613-
614-
case .rerun:
615-
let jobs = try await getJobs(withStatus: status)
616-
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
617-
for jobID in jobs {
618-
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection)
619-
}
620-
621-
case .doNothing:
622-
break
623-
}
624-
}
625-
626531
let jobRegistry: JobRegistry
627532
}
628533

Sources/JobsPostgres/RetentionPolicy.swift

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,18 @@ import Foundation
1616

1717
/// Data rentension policy
1818
public struct RetentionPolicy: Sendable {
19-
2019
/// Data retention policy
2120
public struct RetainData: Equatable, Sendable {
2221
enum Policy {
23-
case retain(for: TimeInterval)
22+
case retain
2423
case doNotRetain
2524
}
2625

2726
let rawValue: Policy
28-
/// Retain policy
29-
/// default to 7 days
30-
public static func retain(for timeAmout: TimeInterval = 60 * 60 * 24 * 7) -> RetainData {
31-
RetainData(rawValue: .retain(for: timeAmout))
32-
}
27+
/// Retain task
28+
public static var retain: RetainData { RetainData(rawValue: .retain) }
3329
/// Never retain any data
34-
public static let never: RetainData = RetainData(rawValue: .doNotRetain)
35-
36-
public static func == (lhs: RetentionPolicy.RetainData, rhs: RetentionPolicy.RetainData) -> Bool {
37-
switch (lhs.rawValue, rhs.rawValue) {
38-
case (.retain(let lhsTimeAmout), .retain(let rhsTimeAmout)):
39-
return lhsTimeAmout == rhsTimeAmout
40-
case (.doNotRetain, .doNotRetain):
41-
return true
42-
default:
43-
return false
44-
}
45-
}
30+
public static var never: RetainData { RetainData(rawValue: .doNotRetain) }
4631
}
4732

4833
/// Jobs with status cancelled
@@ -56,9 +41,9 @@ public struct RetentionPolicy: Sendable {
5641
public var failed: RetainData
5742

5843
public init(
59-
cancelled: RetainData = .retain(),
60-
completed: RetainData = .retain(),
61-
failed: RetainData = .retain(for: 60 * 60 * 24 * 30)
44+
cancelled: RetainData = .retain,
45+
completed: RetainData = .retain,
46+
failed: RetainData = .retain
6247
) {
6348
self.cancelled = cancelled
6449
self.completed = completed

0 commit comments

Comments
 (0)