Skip to content

Commit 5b9a1ad

Browse files
committed
Job retention
1 parent 969987a commit 5b9a1ad

File tree

3 files changed

+136
-33
lines changed

3 files changed

+136
-33
lines changed

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
124124
case failed = 2
125125
case cancelled = 3
126126
case paused = 4
127+
case completed = 5
127128
}
128129

129130
/// Queue configuration
@@ -143,10 +144,10 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
143144
pollTime: Duration = .milliseconds(100),
144145
queueName: String = "default",
145146
retentionPolicy: RetentionPolicy = .init(
146-
canceled: .init(duration: "7D"),
147-
completed: .init(duration: "7D"),
148-
failed: .init(duration: "7D")
149-
) //.keepAll()
147+
cancelled: .init(duration: 7 * 24 * 60),
148+
completed: .init(duration: 7 * 24 * 60),
149+
failed: .init(duration: 7 * 24 * 60)
150+
)
150151
) {
151152
self.pollTime = pollTime
152153
self.queueName = queueName
@@ -192,8 +193,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
192193
public func cancel(jobID: JobID) async throws {
193194
try await self.client.withTransaction(logger: logger) { connection in
194195
try await deleteFromQueue(jobID: jobID, connection: connection)
195-
try await delete(jobID: jobID)
196+
try await setStatus(jobID: jobID, status: .cancelled, connection: connection)
196197
}
198+
try await processDataRetentionPolicy()
197199
}
198200

199201
/// Pause job
@@ -298,15 +300,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
298300

299301
/// Retry an existing Job
300302
/// - Parameters
301-
/// - id: Job instance ID
303+
/// - jobID: Job instance ID
302304
/// - jobRequest: Job Request
303305
/// - options: Job retry options
304-
public func retry<Parameters: JobParameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
306+
public func retry<Parameters: JobParameters>(_ jobID: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
305307
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
306308
try await self.client.withTransaction(logger: self.logger) { connection in
307-
try await self.updateJob(id: id, buffer: buffer, connection: connection)
309+
try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection)
308310
try await self.addToQueue(
309-
jobID: id,
311+
jobID: jobID,
310312
queueName: configuration.queueName,
311313
options: .init(delayUntil: options.delayUntil),
312314
connection: connection
@@ -316,7 +318,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
316318

317319
/// This is called to say job has finished processing and it can be deleted
318320
public func finished(jobID: JobID) async throws {
319-
try await self.delete(jobID: jobID)
321+
//try await self.delete(jobID: jobID)
322+
try await self.setStatus(jobID: jobID, status: .completed)
323+
try await processDataRetentionPolicy()
320324
}
321325

322326
/// This is called to say job has failed to run and should be put aside
@@ -459,15 +463,59 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
459463
logger: self.logger
460464
)
461465
}
462-
// TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
463-
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
466+
467+
func processDataRetentionPolicy() async throws {
468+
try await withThrowingTaskGroup(of: Void.self) { group in
469+
try await self.client.withTransaction(logger: logger) { tx in
470+
let now = Date.now.timeIntervalSince1970
471+
let retentionPolicy = configuration.retentionPolicy
472+
/// process // cancelled events
473+
group.addTask {
474+
try await tx.query(
475+
"""
476+
DELETE FROM swift_jobs.jobs
477+
WHERE status = \(Status.cancelled)
478+
AND extract(epoch FROM last_modified)::int + \(retentionPolicy.cancelled.duration) < \(now)
479+
""",
480+
logger: self.logger
481+
)
482+
}
483+
484+
/// process failed events clean up
485+
group.addTask {
486+
try await tx.query(
487+
"""
488+
DELETE FROM swift_jobs.jobs
489+
WHERE status = \(Status.failed)
490+
AND extract(epoch FROM last_modified)::int + \(retentionPolicy.failed.duration) < \(now)
491+
""",
492+
logger: self.logger
493+
)
494+
}
495+
496+
/// process completed events
497+
group.addTask {
498+
try await tx.query(
499+
"""
500+
DELETE FROM swift_jobs.jobs
501+
WHERE status = \(Status.completed)
502+
AND extract(epoch FROM last_modified)::int + \(retentionPolicy.completed.duration) < \(now)
503+
""",
504+
logger: self.logger
505+
)
506+
}
507+
}
508+
}
509+
}
510+
511+
func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
464512
try await connection.query(
465513
"""
466514
UPDATE swift_jobs.jobs
467515
SET job = \(buffer),
468516
last_modified = \(Date.now),
469517
status = \(Status.failed)
470-
WHERE id = \(id) AND queue_name = \(configuration.queueName)
518+
WHERE id = \(jobID) AND queue_name = \(configuration.queueName)
471519
""",
472520
logger: self.logger
473521
)

Sources/JobsPostgres/RetentionPolicy.swift

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,40 @@
1+
//===----------------------------------------------------------------------===//
12
//
2-
// RetentionPolicy.swift
3-
// swift-jobs-postgres
3+
// This source file is part of the Hummingbird server framework project
44
//
5-
// Created by Stevenson Michel on 3/15/25.
5+
// Copyright (c) 2025 the Hummingbird authors
6+
// Licensed under Apache License v2.0
67
//
8+
// See LICENSE.txt for license information
9+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
714

15+
import Foundation
16+
17+
/// Data rentension policy
818
public struct RetentionPolicy: Sendable {
9-
19+
/// Data retention
1020
public struct RetainData: Sendable {
11-
/// Duration in ISO 8601 format (e.g., "P30D" for 30 days)
12-
public var duration: String
13-
14-
public init(duration: String) {
21+
/// Duration
22+
public var duration: TimeInterval
23+
24+
public init(duration: TimeInterval) {
1525
self.duration = duration
1626
}
1727
}
18-
19-
// public var days: Int
20-
//
21-
// public init(days: Int) {
22-
// self.days = days
23-
// }
28+
2429
/// Jobs with status cancelled
25-
public var canceled: RetainData
30+
public var cancelled: RetainData
2631
/// Jobs with status completed
2732
public var completed: RetainData
2833
/// Jobs with status failed
2934
public var failed: RetainData
3035

31-
public init(canceled: RetainData, completed: RetainData, failed: RetainData) {
32-
self.canceled = canceled
36+
public init(cancelled: RetainData, completed: RetainData, failed: RetainData) {
37+
self.cancelled = cancelled
3338
self.completed = completed
3439
self.failed = failed
3540
}

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,15 @@ final class JobsTests: XCTestCase {
178178
processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
179179
pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
180180
revertMigrations: Bool = true,
181+
configuration: PostgresJobQueue.Configuration = .init(),
181182
function: String = #function,
182183
test: (JobQueue<PostgresJobQueue>) async throws -> T
183184
) async throws -> T {
184-
let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function)
185+
let jobQueue = try await self.createJobQueue(
186+
numWorkers: numWorkers,
187+
configuration: configuration,
188+
function: function
189+
)
185190
return try await self.testJobQueue(
186191
jobQueue: jobQueue,
187192
failedJobsInitialization: failedJobsInitialization,
@@ -490,7 +495,16 @@ final class JobsTests: XCTestCase {
490495
}
491496
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)
492497

493-
try await self.testJobQueue(numWorkers: 4) { jobQueue in
498+
try await self.testJobQueue(
499+
numWorkers: 4,
500+
configuration: .init(
501+
retentionPolicy: .init(
502+
cancelled: .init(duration: -1),
503+
completed: .init(duration: -1),
504+
failed: .init(duration: -1)
505+
)
506+
)
507+
) { jobQueue in
494508
jobQueue.registerJob(parameters: TestParameters.self) { _, _ in
495509
expectation.fulfill()
496510
try await Task.sleep(for: .milliseconds(1000))
@@ -769,7 +783,17 @@ final class JobsTests: XCTestCase {
769783
let didRunCancelledJob: NIOLockedValueBox<Bool> = .init(false)
770784
let didRunNoneCancelledJob: NIOLockedValueBox<Bool> = .init(false)
771785

772-
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
786+
let jobQueue = try await self.createJobQueue(
787+
numWorkers: 1,
788+
configuration: .init(
789+
retentionPolicy: .init(
790+
cancelled: .init(duration: -1),
791+
completed: .init(duration: -1),
792+
failed: .init(duration: -1)
793+
)
794+
),
795+
function: #function
796+
)
773797

774798
try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
775799
queue.registerJob(parameters: TestParameters.self) { parameters, context in
@@ -827,4 +851,30 @@ final class JobsTests: XCTestCase {
827851
XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false)
828852
XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true)
829853
}
854+
855+
func testJobRetention() async throws {
856+
struct TestParameters: JobParameters {
857+
static let jobName = "testJobRetention"
858+
let value: Int
859+
}
860+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3)
861+
try await self.testJobQueue(numWorkers: 1) { jobQueue in
862+
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
863+
context.logger.info("Parameters=\(parameters.value)")
864+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
865+
expectation.fulfill()
866+
}
867+
let firstJob = try await jobQueue.push(TestParameters(value: 1))
868+
let secondJob = try await jobQueue.push(TestParameters(value: 2))
869+
let thirdJob = try await jobQueue.push(TestParameters(value: 3))
870+
871+
await fulfillment(of: [expectation], timeout: 5)
872+
873+
let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed)
874+
XCTAssertEqual(completedJobs.count, 3)
875+
try await jobQueue.queue.delete(jobID: firstJob)
876+
try await jobQueue.queue.delete(jobID: secondJob)
877+
try await jobQueue.queue.delete(jobID: thirdJob)
878+
}
879+
}
830880
}

0 commit comments

Comments
 (0)