Skip to content

Job retention and cleanup #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024-2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import Jobs
import Logging
import NIOConcurrencyHelpers
import NIOCore
import PostgresMigrations
import PostgresNIO

/// Parameters for Cleanup job
public struct JobCleanupParameters: Sendable & Codable {
let failedJobs: PostgresJobQueue.JobCleanup
let completedJobs: PostgresJobQueue.JobCleanup
let cancelledJobs: PostgresJobQueue.JobCleanup

public init(
failedJobs: PostgresJobQueue.JobCleanup = .doNothing,
completedJobs: PostgresJobQueue.JobCleanup = .doNothing,
cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing
) {
self.failedJobs = failedJobs
self.completedJobs = completedJobs
self.cancelledJobs = cancelledJobs
}
}

extension PostgresJobQueue {
/// what to do with failed/processing jobs from last time queue was handled
public struct JobCleanup: Sendable, Codable {
enum RawValue: Codable {
case doNothing
case rerun
case remove(maxAge: Duration?)
}
let rawValue: RawValue

public static var doNothing: Self { .init(rawValue: .doNothing) }
public static var rerun: Self { .init(rawValue: .rerun) }
public static var remove: Self { .init(rawValue: .remove(maxAge: nil)) }
public static func remove(maxAge: Duration) -> Self { .init(rawValue: .remove(maxAge: maxAge)) }
}

/// clean up job name.
///
/// Use this with the ``JobSchedule`` to schedule a cleanup of
/// failed, cancelled or completed jobs
public var cleanupJob: JobName<JobCleanupParameters> {
.init("_Jobs_PostgresCleanup_\(self.configuration.queueName)")
}

/// register clean up job on queue
func registerCleanupJob() {
self.registerJob(
JobDefinition(name: cleanupJob, parameters: JobCleanupParameters.self, retryStrategy: .dontRetry) { parameters, context in
try await self.cleanup(
failedJobs: parameters.failedJobs,
processingJobs: .doNothing,
pendingJobs: .doNothing,
completedJobs: parameters.completedJobs,
cancelledJobs: parameters.cancelledJobs,
logger: self.logger
)
}
)
}

/// Cleanup job queues
///
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
/// pushed back into the pending queue to be re-run or removed. When called at startup in
/// theory no job should be set to processing, or set to pending but not in the queue. but if
/// your job server crashes these states are possible, so we also provide options to re-queue
/// these jobs so they are run again.
///
/// The job queue needs to be running when you call cleanup. You can call `cleanup` with
/// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing
/// or pending jobs should only be done if you are certain there is nothing else processing
/// the job queue.
///
/// - Parameters:
/// - failedJobs: What to do with jobs tagged as failed
/// - processingJobs: What to do with jobs tagged as processing
/// - pendingJobs: What to do with jobs tagged as pending
/// - completedJobs: What to do with jobs tagged as completed
/// - cancelledJobs: What to do with jobs tagged as cancelled
/// - logger: Optional logger to use when performing cleanup
/// - Throws:
public func cleanup(
failedJobs: JobCleanup = .doNothing,
processingJobs: JobCleanup = .doNothing,
pendingJobs: JobCleanup = .doNothing,
completedJobs: JobCleanup = .doNothing,
cancelledJobs: JobCleanup = .doNothing,
logger: Logger? = nil
) async throws {
let logger = logger ?? self.logger
do {
/// wait for migrations to complete before running job queue cleanup
try await self.migrations.waitUntilCompleted()
_ = try await self.client.withTransaction(logger: logger) { connection in
self.logger.info("Cleanup Jobs")
try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection)
try await self.updateJobsOnInit(withStatus: .processing, onInit: processingJobs, connection: connection)
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
try await self.updateJobsOnInit(withStatus: .completed, onInit: completedJobs, connection: connection)
try await self.updateJobsOnInit(withStatus: .cancelled, onInit: cancelledJobs, connection: connection)
}
} catch let error as PSQLError {
logger.error(
"JobQueue cleanup failed",
metadata: [
"Error": "\(String(reflecting: error))"
]
)
throw error

Check warning on line 128 in Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift#L122-L128

Added lines #L122 - L128 were not covered by tests
}
}

func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws {
switch onInit.rawValue {
case .remove(let olderThan):
let date: Date =
if let olderThan {
.now - Double(olderThan.components.seconds)
} else {
.distantFuture
}
try await connection.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(status) AND queue_name = \(configuration.queueName)
AND last_modified < \(date)
""",
logger: self.logger
)

case .rerun:
let jobs = try await getJobs(withStatus: status, connection: connection)
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
for jobID in jobs {
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection)
}

case .doNothing:
break
}
}
}
128 changes: 43 additions & 85 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@
public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue {

public typealias JobID = UUID
/// what to do with failed/processing jobs from last time queue was handled
public enum JobCleanup: Sendable {
case doNothing
case rerun
case remove
}

/// Job priority from lowest to highest
public struct JobPriority: Equatable, Sendable {
Expand Down Expand Up @@ -124,6 +118,7 @@
case failed = 2
case cancelled = 3
case paused = 4
case completed = 5
}

/// Queue configuration
Expand All @@ -132,17 +127,21 @@
let pollTime: Duration
/// Which Queue to push jobs into
let queueName: String
/// Retention policy for jobs
let retentionPolicy: RetentionPolicy

/// Initialize configuration
/// - Parameters
/// - pollTime: Queue poll time to wait if queue empties
/// - queueName: Name of queue we are handing
public init(
pollTime: Duration = .milliseconds(100),
queueName: String = "default"
queueName: String = "default",
retentionPolicy: RetentionPolicy = .init()
) {
self.pollTime = pollTime
self.queueName = queueName
self.retentionPolicy = retentionPolicy
}
}

Expand All @@ -152,7 +151,6 @@
public let configuration: Configuration
/// Logger used by queue
public let logger: Logger

let migrations: DatabaseMigrations
let isStopped: NIOLockedValueBox<Bool>

Expand All @@ -165,6 +163,7 @@
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
self.registerCleanupJob()
}

public func onInit() async throws {
Expand All @@ -184,7 +183,11 @@
public func cancel(jobID: JobID) async throws {
try await self.client.withTransaction(logger: logger) { connection in
try await deleteFromQueue(jobID: jobID, connection: connection)
try await delete(jobID: jobID)
if configuration.retentionPolicy.cancelled == .doNotRetain {
try await delete(jobID: jobID)
} else {
try await setStatus(jobID: jobID, status: .cancelled, connection: connection)
}
}
}

Expand Down Expand Up @@ -223,53 +226,6 @@
}
}

/// Cleanup job queues
///
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
/// pushed back into the pending queue to be re-run or removed. When called at startup in
/// theory no job should be set to processing, or set to pending but not in the queue. but if
/// your job server crashes these states are possible, so we also provide options to re-queue
/// these jobs so they are run again.
///
/// The job queue needs to be running when you call cleanup. You can call `cleanup` with
/// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing
/// or pending jobs should only be done if you are certain there is nothing else processing
/// the job queue.
///
/// - Parameters:
/// - failedJobs: What to do with jobs tagged as failed
/// - processingJobs: What to do with jobs tagged as processing
/// - pendingJobs: What to do with jobs tagged as pending
/// - Throws:
public func cleanup(
failedJobs: JobCleanup = .doNothing,
processingJobs: JobCleanup = .doNothing,
pendingJobs: JobCleanup = .doNothing
) async throws {
do {
/// wait for migrations to complete before running job queue cleanup
try await self.migrations.waitUntilCompleted()
_ = try await self.client.withConnection { connection in
self.logger.info("Update Jobs")
try await self.updateJobsOnInit(withStatus: .pending, onInit: pendingJobs, connection: connection)
try await self.updateJobsOnInit(
withStatus: .processing,
onInit: processingJobs,
connection: connection
)
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
}
} catch let error as PSQLError {
logger.error(
"JobQueue initialization failed",
metadata: [
"Error": "\(String(reflecting: error))"
]
)
throw error
}
}

/// Register job
/// - Parameters:
/// - job: Job Definition
Expand All @@ -290,15 +246,15 @@

/// Retry an existing Job
/// - Parameters
/// - id: Job instance ID
/// - jobID: Job instance ID
/// - jobRequest: Job Request
/// - options: Job retry options
public func retry<Parameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
public func retry<Parameters>(_ jobID: JobID, jobRequest: JobRequest<Parameters>, options: JobRetryOptions) async throws {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.updateJob(id: id, buffer: buffer, connection: connection)
try await self.updateJob(jobID: jobID, buffer: buffer, connection: connection)
try await self.addToQueue(
jobID: id,
jobID: jobID,
queueName: configuration.queueName,
options: .init(delayUntil: options.delayUntil),
connection: connection
Expand All @@ -308,12 +264,20 @@

/// This is called to say job has finished processing and it can be deleted
public func finished(jobID: JobID) async throws {
try await self.delete(jobID: jobID)
if configuration.retentionPolicy.completed == .doNotRetain {
try await self.delete(jobID: jobID)
} else {
try await self.setStatus(jobID: jobID, status: .completed)
}
}

/// This is called to say job has failed to run and should be put aside
public func failed(jobID: JobID, error: Error) async throws {
try await self.setStatus(jobID: jobID, status: .failed)
if configuration.retentionPolicy.failed == .doNotRetain {
try await self.delete(jobID: jobID)

Check warning on line 277 in Sources/JobsPostgres/PostgresJobsQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/PostgresJobsQueue.swift#L277

Added line #L277 was not covered by tests
} else {
try await self.setStatus(jobID: jobID, status: .failed)
}
}

/// stop serving jobs
Expand Down Expand Up @@ -451,15 +415,15 @@
logger: self.logger
)
}
// TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {

func updateJob(jobID: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
try await connection.query(
"""
UPDATE swift_jobs.jobs
SET job = \(buffer),
last_modified = \(Date.now),
status = \(Status.failed)
WHERE id = \(id) AND queue_name = \(configuration.queueName)
WHERE id = \(jobID) AND queue_name = \(configuration.queueName)
""",
logger: self.logger
)
Expand Down Expand Up @@ -538,27 +502,21 @@
return jobs
}

func updateJobsOnInit(withStatus status: Status, onInit: JobCleanup, connection: PostgresConnection) async throws {
switch onInit {
case .remove:
try await connection.query(
"""
DELETE FROM swift_jobs.jobs
WHERE status = \(status) AND queue_name = \(configuration.queueName)
""",
logger: self.logger
)

case .rerun:
let jobs = try await getJobs(withStatus: status)
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
for jobID in jobs {
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection)
}

case .doNothing:
break
func getJobs(withStatus status: Status, connection: PostgresConnection) async throws -> [JobID] {
let stream = try await connection.query(
"""
SELECT
id
FROM swift_jobs.jobs
WHERE status = \(status) AND queue_name = \(configuration.queueName)
""",
logger: self.logger
)
var jobs: [JobID] = []
for try await id in stream.decode(JobID.self, context: .default) {
jobs.append(id)
}
return jobs
}

let jobRegistry: JobRegistry
Expand Down
Loading
Loading