Skip to content

Jobs retention #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
102 changes: 74 additions & 28 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
case highest = 4
}
/// Lowest priority
public static func lowest() -> JobPriority {
JobPriority(rawValue: .lowest)
}
public static let lowest: JobPriority = JobPriority(rawValue: .lowest)
/// Lower priority
public static func lower() -> JobPriority {
JobPriority(rawValue: .lower)
}
public static let lower: JobPriority = JobPriority(rawValue: .lower)
/// Normal is the default priority
public static func normal() -> JobPriority {
JobPriority(rawValue: .normal)
}
public static let normal: JobPriority = JobPriority(rawValue: .normal)
/// Higher priority
public static func higher() -> JobPriority {
JobPriority(rawValue: .higher)
}
public static let higher: JobPriority = JobPriority(rawValue: .higher)
/// Higgest priority
public static func highest() -> JobPriority {
JobPriority(rawValue: .highest)
}
public static let highest: JobPriority = JobPriority(rawValue: .highest)
}

/// Options for job pushed to queue
Expand All @@ -96,20 +86,20 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
/// Default initializer for JobOptions
public init() {
self.delayUntil = .now
self.priority = .normal()
self.priority = .normal
}

/// Initializer for JobOptions
/// - Parameter delayUntil: Whether job execution should be delayed until a later date
public init(delayUntil: Date?) {
self.delayUntil = delayUntil ?? .now
self.priority = .normal()
self.priority = .normal
}

/// Initializer for JobOptions
/// - Parameter delayUntil: Whether job execution should be delayed until a later date
/// - Parameter priority: The priority for a job
public init(delayUntil: Date = .now, priority: JobPriority = .normal()) {
public init(delayUntil: Date = .now, priority: JobPriority = .normal) {
self.delayUntil = delayUntil
self.priority = priority
}
Expand All @@ -134,6 +124,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
case failed = 2
case cancelled = 3
case paused = 4
case completed = 5
}

/// Queue configuration
Expand All @@ -142,17 +133,25 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
let pollTime: Duration
/// Which Queue to push jobs into
let queueName: String
/// Retention policy for jobs
let retentionPolicy: RetentionPolicy

/// Initialize configuration
/// - Parameters
/// - pollTime: Queue poll time to wait if queue empties
/// - queueName: Name of queue we are handing
public init(
pollTime: Duration = .milliseconds(100),
queueName: String = "default"
queueName: String = "default",
retentionPolicy: RetentionPolicy = .init(
cancelled: .init(duration: 7 * 24 * 60),
completed: .init(duration: 7 * 24 * 60),
failed: .init(duration: 7 * 24 * 60)
)
) {
self.pollTime = pollTime
self.queueName = queueName
self.retentionPolicy = retentionPolicy
}
}

Expand Down Expand Up @@ -194,8 +193,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
public func cancel(jobID: JobID) async throws {
try await self.client.withTransaction(logger: logger) { connection in
try await deleteFromQueue(jobID: jobID, connection: connection)
try await delete(jobID: jobID)
try await setStatus(jobID: jobID, status: .cancelled, connection: connection)
}
try await processDataRetentionPolicy()
}

/// Pause job
Expand Down Expand Up @@ -300,15 +300,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma

/// Retry an existing Job
/// - Parameters
/// - id: Job instance ID
/// - jobID: Job instance ID
/// - jobRequest: Job Request
/// - options: Job retry options
public func retry<Parameters: JobParameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
public func retry<Parameters: JobParameters>(_ jobID: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.updateJob(id: id, buffer: buffer, connection: connection)
try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection)
try await self.addToQueue(
jobID: id,
jobID: jobID,
queueName: configuration.queueName,
options: .init(delayUntil: options.delayUntil),
connection: connection
Expand All @@ -318,7 +318,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma

/// This is called to say job has finished processing and it can be deleted
public func finished(jobID: JobID) async throws {
try await self.delete(jobID: jobID)
//try await self.delete(jobID: jobID)
try await self.setStatus(jobID: jobID, status: .completed)
try await processDataRetentionPolicy()
}

/// This is called to say job has failed to run and should be put aside
Expand Down Expand Up @@ -461,15 +463,59 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
logger: self.logger
)
}
// TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {

func processDataRetentionPolicy() async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
try await self.client.withTransaction(logger: logger) { tx in
let now = Date.now.timeIntervalSince1970
let retentionPolicy = configuration.retentionPolicy
/// process // cancelled events
group.addTask {
try await tx.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(Status.cancelled)
AND extract(epoch FROM last_modified)::int + \(retentionPolicy.cancelled.duration) < \(now)
""",
logger: self.logger
)
}

/// process failed events clean up
group.addTask {
try await tx.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(Status.failed)
AND extract(epoch FROM last_modified)::int + \(retentionPolicy.failed.duration) < \(now)
""",
logger: self.logger
)
}

/// process completed events
group.addTask {
try await tx.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(Status.completed)
AND extract(epoch FROM last_modified)::int + \(retentionPolicy.completed.duration) < \(now)
""",
logger: self.logger
)
}
}
}
}

func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
try await connection.query(
"""
UPDATE swift_jobs.jobs
SET job = \(buffer),
last_modified = \(Date.now),
status = \(Status.failed)
WHERE id = \(id) AND queue_name = \(configuration.queueName)
WHERE id = \(jobID) AND queue_name = \(configuration.queueName)
""",
logger: self.logger
)
Expand Down
41 changes: 41 additions & 0 deletions Sources/JobsPostgres/RetentionPolicy.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation

/// Data rentension policy
public struct RetentionPolicy: Sendable {
/// Data retention
public struct RetainData: Sendable {
/// Duration
public var duration: TimeInterval

public init(duration: TimeInterval) {
self.duration = duration
}
}

/// Jobs with status cancelled
public var cancelled: RetainData
/// Jobs with status completed
public var completed: RetainData
/// Jobs with status failed
public var failed: RetainData

public init(cancelled: RetainData, completed: RetainData, failed: RetainData) {
self.cancelled = cancelled
self.completed = completed
self.failed = failed
}
}
72 changes: 61 additions & 11 deletions Tests/JobsPostgresTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,15 @@ final class JobsTests: XCTestCase {
processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
revertMigrations: Bool = true,
configuration: PostgresJobQueue.Configuration = .init(),
function: String = #function,
test: (JobQueue<PostgresJobQueue>) async throws -> T
) async throws -> T {
let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: .init(), function: function)
let jobQueue = try await self.createJobQueue(
numWorkers: numWorkers,
configuration: configuration,
function: function
)
return try await self.testJobQueue(
jobQueue: jobQueue,
failedJobsInitialization: failedJobsInitialization,
Expand Down Expand Up @@ -278,14 +283,14 @@ final class JobsTests: XCTestCase {
try await queue.push(
TestParameters(value: 20),
options: .init(
priority: .lowest()
priority: .lowest
)
)

try await queue.push(
TestParameters(value: 2025),
options: .init(
priority: .highest()
priority: .highest
)
)

Expand Down Expand Up @@ -332,15 +337,15 @@ final class JobsTests: XCTestCase {
try await queue.push(
TestParameters(value: 20),
options: .init(
priority: .lower()
priority: .lower
)
)

try await queue.push(
TestParameters(value: 2025),
options: .init(
delayUntil: Date.now.addingTimeInterval(1),
priority: .higher()
priority: .higher
)
)

Expand Down Expand Up @@ -490,7 +495,16 @@ final class JobsTests: XCTestCase {
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)

try await self.testJobQueue(numWorkers: 4) { jobQueue in
try await self.testJobQueue(
numWorkers: 4,
configuration: .init(
retentionPolicy: .init(
cancelled: .init(duration: -1),
completed: .init(duration: -1),
failed: .init(duration: -1)
)
)
) { jobQueue in
jobQueue.registerJob(parameters: TestParameters.self) { _, _ in
expectation.fulfill()
try await Task.sleep(for: .milliseconds(1000))
Expand Down Expand Up @@ -717,14 +731,14 @@ final class JobsTests: XCTestCase {
let resumableJob = try await queue.push(
ResumableJob(),
options: .init(
priority: .lowest()
priority: .lowest
)
)

try await queue.push(
TestParameters(),
options: .init(
priority: .normal()
priority: .normal
)
)

Expand Down Expand Up @@ -769,7 +783,17 @@ final class JobsTests: XCTestCase {
let didRunCancelledJob: NIOLockedValueBox<Bool> = .init(false)
let didRunNoneCancelledJob: NIOLockedValueBox<Bool> = .init(false)

let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
let jobQueue = try await self.createJobQueue(
numWorkers: 1,
configuration: .init(
retentionPolicy: .init(
cancelled: .init(duration: -1),
completed: .init(duration: -1),
failed: .init(duration: -1)
)
),
function: #function
)

try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
queue.registerJob(parameters: TestParameters.self) { parameters, context in
Expand All @@ -793,14 +817,14 @@ final class JobsTests: XCTestCase {
let cancellableJob = try await queue.push(
TestParameters(value: 42),
options: .init(
priority: .lower()
priority: .lower
)
)

try await queue.push(
NoneCancelledJobParameters(value: 2025),
options: .init(
priority: .highest()
priority: .highest
)
)

Expand All @@ -827,4 +851,30 @@ final class JobsTests: XCTestCase {
XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false)
XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true)
}

func testJobRetention() async throws {
struct TestParameters: JobParameters {
static let jobName = "testJobRetention"
let value: Int
}
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3)
try await self.testJobQueue(numWorkers: 1) { jobQueue in
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
context.logger.info("Parameters=\(parameters.value)")
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
expectation.fulfill()
}
let firstJob = try await jobQueue.push(TestParameters(value: 1))
let secondJob = try await jobQueue.push(TestParameters(value: 2))
let thirdJob = try await jobQueue.push(TestParameters(value: 3))

await fulfillment(of: [expectation], timeout: 5)

let completedJobs = try await jobQueue.queue.getJobs(withStatus: .completed)
XCTAssertEqual(completedJobs.count, 3)
try await jobQueue.queue.delete(jobID: firstJob)
try await jobQueue.queue.delete(jobID: secondJob)
try await jobQueue.queue.delete(jobID: thirdJob)
}
}
}
Loading