Skip to content
80 changes: 69 additions & 11 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
case failed = 2
case cancelled = 3
case paused = 4
case completed = 5
}

/// Queue configuration
Expand All @@ -132,17 +133,21 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
let pollTime: Duration
/// Which Queue to push jobs into
let queueName: String
/// Retention policy for jobs
let retentionPolicy: RetentionPolicy

/// Initialize configuration
/// - Parameters
/// - pollTime: Queue poll time to wait if queue empties
/// - queueName: Name of queue we are handing
public init(
pollTime: Duration = .milliseconds(100),
queueName: String = "default"
queueName: String = "default",
retentionPolicy: RetentionPolicy = .init()
) {
self.pollTime = pollTime
self.queueName = queueName
self.retentionPolicy = retentionPolicy
}
}

Expand Down Expand Up @@ -184,7 +189,11 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
public func cancel(jobID: JobID) async throws {
try await self.client.withTransaction(logger: logger) { connection in
try await deleteFromQueue(jobID: jobID, connection: connection)
try await delete(jobID: jobID)
if configuration.retentionPolicy.cancelled == .never {
try await delete(jobID: jobID)
} else {
try await setStatus(jobID: jobID, status: .cancelled, connection: connection)
}
}
}

Expand Down Expand Up @@ -290,15 +299,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma

/// Retry an existing Job
/// - Parameters
/// - id: Job instance ID
/// - jobID: Job instance ID
/// - jobRequest: Job Request
/// - options: Job retry options
public func retry<Parameters: JobParameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
public func retry<Parameters: JobParameters>(_ jobID: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.updateJob(id: id, buffer: buffer, connection: connection)
try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection)
try await self.addToQueue(
jobID: id,
jobID: jobID,
queueName: configuration.queueName,
options: .init(delayUntil: options.delayUntil),
connection: connection
Expand All @@ -308,12 +317,20 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma

/// This is called to say job has finished processing and it can be deleted
public func finished(jobID: JobID) async throws {
try await self.delete(jobID: jobID)
if configuration.retentionPolicy.completed == .never {
try await self.delete(jobID: jobID)
} else {
try await self.setStatus(jobID: jobID, status: .completed)
}
}

/// This is called to say job has failed to run and should be put aside
public func failed(jobID: JobID, error: Error) async throws {
try await self.setStatus(jobID: jobID, status: .failed)
if configuration.retentionPolicy.failed == .never {
try await self.delete(jobID: jobID)
} else {
try await self.setStatus(jobID: jobID, status: .failed)
}
}

/// stop serving jobs
Expand Down Expand Up @@ -451,15 +468,56 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
logger: self.logger
)
}
// TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
/// Helper func which to be use by a scheduled jobs
/// for performing job clean up based on a given set of policies
public func processDataRetentionPolicy() async throws {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we are expecting people to set up a Job to call this, why don't we provide them with one.

try await self.client.withTransaction(logger: logger) { tx in
let now = Date.now.timeIntervalSince1970
let retentionPolicy: RetentionPolicy = configuration.retentionPolicy

if case let .retain(timeAmout) = retentionPolicy.cancelled.rawValue {
try await tx.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(Status.cancelled)
AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now)
""",
logger: self.logger
)
}

if case let .retain(timeAmout) = retentionPolicy.completed.rawValue {
try await tx.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(Status.completed)
AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now)
""",
logger: self.logger
)
}

if case let .retain(timeAmout) = retentionPolicy.failed.rawValue {
try await tx.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(Status.failed)
AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now)
""",
logger: self.logger
)
}
}
}

func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
try await connection.query(
"""
UPDATE swift_jobs.jobs
SET job = \(buffer),
last_modified = \(Date.now),
status = \(Status.failed)
WHERE id = \(id) AND queue_name = \(configuration.queueName)
WHERE id = \(jobID) AND queue_name = \(configuration.queueName)
""",
logger: self.logger
)
Expand Down
67 changes: 67 additions & 0 deletions Sources/JobsPostgres/RetentionPolicy.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation

/// Data rentension policy
public struct RetentionPolicy: Sendable {

/// Data retention policy
public struct RetainData: Equatable, Sendable {
enum Policy {
case retain(for: TimeInterval)
case doNotRetain
}

let rawValue: Policy
/// Retain policy
/// default to 7 days
public static func retain(for timeAmout: TimeInterval = 60 * 60 * 24 * 7) -> RetainData {
RetainData(rawValue: .retain(for: timeAmout))
}
/// Never retain any data
public static let never: RetainData = RetainData(rawValue: .doNotRetain)

public static func == (lhs: RetentionPolicy.RetainData, rhs: RetentionPolicy.RetainData) -> Bool {
switch (lhs.rawValue, rhs.rawValue) {
case (.retain(let lhsTimeAmout), .retain(let rhsTimeAmout)):
return lhsTimeAmout == rhsTimeAmout
case (.doNotRetain, .doNotRetain):
return true
default:
return false
}
}
}

/// Jobs with status cancelled
/// default retention is set for 7 days
public var cancelled: RetainData
/// Jobs with status completed
/// default retention is set to 7 days
public var completed: RetainData
/// Jobs with status failed
/// default retention is set to 30 days
public var failed: RetainData

public init(
cancelled: RetainData = .retain(),
completed: RetainData = .retain(),
failed: RetainData = .retain(for: 60 * 60 * 24 * 30)
) {
self.cancelled = cancelled
self.completed = completed
self.failed = failed
}
}
63 changes: 60 additions & 3 deletions Tests/JobsPostgresTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,15 @@ final class JobsTests: XCTestCase {
processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
revertMigrations: Bool = true,
configuration: PostgresJobQueue.Configuration = .init(),
function: String = #function,
test: (JobQueue<PostgresJobQueue>) async throws -> T
) async throws -> T {
let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function)
let jobQueue = try await self.createJobQueue(
numWorkers: numWorkers,
configuration: configuration,
function: function
)
return try await self.testJobQueue(
jobQueue: jobQueue,
failedJobsInitialization: failedJobsInitialization,
Expand Down Expand Up @@ -490,7 +495,16 @@ final class JobsTests: XCTestCase {
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)

try await self.testJobQueue(numWorkers: 4) { jobQueue in
try await self.testJobQueue(
numWorkers: 4,
configuration: .init(
retentionPolicy: .init(
cancelled: .never,
completed: .never,
failed: .never
)
)
) { jobQueue in
jobQueue.registerJob(parameters: TestParameters.self) { _, _ in
expectation.fulfill()
try await Task.sleep(for: .milliseconds(1000))
Expand Down Expand Up @@ -769,7 +783,17 @@ final class JobsTests: XCTestCase {
let didRunCancelledJob: NIOLockedValueBox<Bool> = .init(false)
let didRunNoneCancelledJob: NIOLockedValueBox<Bool> = .init(false)

let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
let jobQueue = try await self.createJobQueue(
numWorkers: 1,
configuration: .init(
retentionPolicy: .init(
cancelled: .never,
completed: .never,
failed: .never
)
),
function: #function
)

try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
queue.registerJob(parameters: TestParameters.self) { parameters, context in
Expand Down Expand Up @@ -827,4 +851,37 @@ final class JobsTests: XCTestCase {
XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false)
XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true)
}

func testJobRetention() async throws {
struct TestParameters: JobParameters {
static let jobName = "testJobRetention"
let value: Int
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3)
try await self.testJobQueue(
numWorkers: 1,
configuration: .init(
retentionPolicy: .init(
cancelled: .retain(for: -1),
completed: .retain(for: -1),
failed: .retain(for: -1)
)
)
) { jobQueue in
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
context.logger.info("Parameters=\(parameters.value)")
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
expectation.fulfill()
}
try await jobQueue.push(TestParameters(value: 1))
try await jobQueue.push(TestParameters(value: 2))
try await jobQueue.push(TestParameters(value: 3))

await fulfillment(of: [expectation], timeout: 10)

let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed)
XCTAssertEqual(completedJobs.count, 3)
try await jobQueue.queue.processDataRetentionPolicy()
}
}
}
Loading