Skip to content

Jobs retention #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
80 changes: 69 additions & 11 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
case failed = 2
case cancelled = 3
case paused = 4
case completed = 5
}

/// Queue configuration
Expand All @@ -132,17 +133,21 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
let pollTime: Duration
/// Which Queue to push jobs into
let queueName: String
/// Retention policy for jobs
let retentionPolicy: RetentionPolicy

/// Initialize configuration
/// - Parameters
/// - pollTime: Queue poll time to wait if queue empties
/// - queueName: Name of queue we are handing
public init(
pollTime: Duration = .milliseconds(100),
queueName: String = "default"
queueName: String = "default",
retentionPolicy: RetentionPolicy = .init()
) {
self.pollTime = pollTime
self.queueName = queueName
self.retentionPolicy = retentionPolicy
}
}

Expand Down Expand Up @@ -184,7 +189,11 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
public func cancel(jobID: JobID) async throws {
try await self.client.withTransaction(logger: logger) { connection in
try await deleteFromQueue(jobID: jobID, connection: connection)
try await delete(jobID: jobID)
if configuration.retentionPolicy.cancelled == .never {
try await delete(jobID: jobID)
} else {
try await setStatus(jobID: jobID, status: .cancelled, connection: connection)
}
}
}

Expand Down Expand Up @@ -290,15 +299,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma

/// Retry an existing Job
/// - Parameters
/// - id: Job instance ID
/// - jobID: Job instance ID
/// - jobRequest: Job Request
/// - options: Job retry options
public func retry<Parameters: JobParameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
public func retry<Parameters: JobParameters>(_ jobID: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.updateJob(id: id, buffer: buffer, connection: connection)
try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection)
try await self.addToQueue(
jobID: id,
jobID: jobID,
queueName: configuration.queueName,
options: .init(delayUntil: options.delayUntil),
connection: connection
Expand All @@ -308,12 +317,20 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma

/// This is called to say job has finished processing and it can be deleted
public func finished(jobID: JobID) async throws {
try await self.delete(jobID: jobID)
if configuration.retentionPolicy.completed == .never {
try await self.delete(jobID: jobID)
} else {
try await self.setStatus(jobID: jobID, status: .completed)
}
}

/// This is called to say job has failed to run and should be put aside
public func failed(jobID: JobID, error: Error) async throws {
try await self.setStatus(jobID: jobID, status: .failed)
if configuration.retentionPolicy.failed == .never {
try await self.delete(jobID: jobID)
} else {
try await self.setStatus(jobID: jobID, status: .failed)
}
}

/// stop serving jobs
Expand Down Expand Up @@ -451,15 +468,56 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
logger: self.logger
)
}
// TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
/// Helper func which to be use by a scheduled jobs
/// for performing job clean up based on a given set of policies
public func processDataRetentionPolicy() async throws {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we are expecting people to set up a Job to call this, why don't we provide them with one.

try await self.client.withTransaction(logger: logger) { tx in
let now = Date.now.timeIntervalSince1970
let retentionPolicy: RetentionPolicy = configuration.retentionPolicy

if case let .retain(timeAmout) = retentionPolicy.cancelled.rawValue {
try await tx.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(Status.cancelled)
AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now)
""",
logger: self.logger
)
}

if case let .retain(timeAmout) = retentionPolicy.completed.rawValue {
try await tx.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(Status.completed)
AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now)
""",
logger: self.logger
)
}

if case let .retain(timeAmout) = retentionPolicy.failed.rawValue {
try await tx.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(Status.failed)
AND extract(epoch FROM last_modified)::int + \(timeAmout) < \(now)
""",
logger: self.logger
)
}
}
}

func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
try await connection.query(
"""
UPDATE swift_jobs.jobs
SET job = \(buffer),
last_modified = \(Date.now),
status = \(Status.failed)
WHERE id = \(id) AND queue_name = \(configuration.queueName)
WHERE id = \(jobID) AND queue_name = \(configuration.queueName)
""",
logger: self.logger
)
Expand Down
67 changes: 67 additions & 0 deletions Sources/JobsPostgres/RetentionPolicy.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation

/// Data rentension policy
public struct RetentionPolicy: Sendable {

/// Data retention policy
public struct RetainData: Equatable, Sendable {
enum Policy {
case retain(for: TimeInterval)
case doNotRetain
}

let rawValue: Policy
/// Retain policy
/// default to 7 days
public static func retain(for timeAmout: TimeInterval = 60 * 60 * 24 * 7) -> RetainData {
RetainData(rawValue: .retain(for: timeAmout))
}
/// Never retain any data
public static let never: RetainData = RetainData(rawValue: .doNotRetain)

public static func == (lhs: RetentionPolicy.RetainData, rhs: RetentionPolicy.RetainData) -> Bool {
switch (lhs.rawValue, rhs.rawValue) {
case (.retain(let lhsTimeAmout), .retain(let rhsTimeAmout)):
return lhsTimeAmout == rhsTimeAmout
case (.doNotRetain, .doNotRetain):
return true
default:
return false
}
}
}

/// Jobs with status cancelled
/// default retention is set for 7 days
public var cancelled: RetainData
/// Jobs with status completed
/// default retention is set to 7 days
public var completed: RetainData
/// Jobs with status failed
/// default retention is set to 30 days
public var failed: RetainData

public init(
cancelled: RetainData = .retain(),
completed: RetainData = .retain(),
failed: RetainData = .retain(for: 60 * 60 * 24 * 30)
) {
self.cancelled = cancelled
self.completed = completed
self.failed = failed
}
}
63 changes: 60 additions & 3 deletions Tests/JobsPostgresTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,15 @@ final class JobsTests: XCTestCase {
processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
revertMigrations: Bool = true,
configuration: PostgresJobQueue.Configuration = .init(),
function: String = #function,
test: (JobQueue<PostgresJobQueue>) async throws -> T
) async throws -> T {
let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function)
let jobQueue = try await self.createJobQueue(
numWorkers: numWorkers,
configuration: configuration,
function: function
)
return try await self.testJobQueue(
jobQueue: jobQueue,
failedJobsInitialization: failedJobsInitialization,
Expand Down Expand Up @@ -490,7 +495,16 @@ final class JobsTests: XCTestCase {
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)

try await self.testJobQueue(numWorkers: 4) { jobQueue in
try await self.testJobQueue(
numWorkers: 4,
configuration: .init(
retentionPolicy: .init(
cancelled: .never,
completed: .never,
failed: .never
)
)
) { jobQueue in
jobQueue.registerJob(parameters: TestParameters.self) { _, _ in
expectation.fulfill()
try await Task.sleep(for: .milliseconds(1000))
Expand Down Expand Up @@ -769,7 +783,17 @@ final class JobsTests: XCTestCase {
let didRunCancelledJob: NIOLockedValueBox<Bool> = .init(false)
let didRunNoneCancelledJob: NIOLockedValueBox<Bool> = .init(false)

let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
let jobQueue = try await self.createJobQueue(
numWorkers: 1,
configuration: .init(
retentionPolicy: .init(
cancelled: .never,
completed: .never,
failed: .never
)
),
function: #function
)

try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
queue.registerJob(parameters: TestParameters.self) { parameters, context in
Expand Down Expand Up @@ -827,4 +851,37 @@ final class JobsTests: XCTestCase {
XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false)
XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true)
}

func testJobRetention() async throws {
struct TestParameters: JobParameters {
static let jobName = "testJobRetention"
let value: Int
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3)
try await self.testJobQueue(
numWorkers: 1,
configuration: .init(
retentionPolicy: .init(
cancelled: .retain(for: -1),
completed: .retain(for: -1),
failed: .retain(for: -1)
)
)
) { jobQueue in
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
context.logger.info("Parameters=\(parameters.value)")
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
expectation.fulfill()
}
try await jobQueue.push(TestParameters(value: 1))
try await jobQueue.push(TestParameters(value: 2))
try await jobQueue.push(TestParameters(value: 3))

await fulfillment(of: [expectation], timeout: 10)

let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed)
XCTAssertEqual(completedJobs.count, 3)
try await jobQueue.queue.processDataRetentionPolicy()
}
}
}
Loading