From f4c480e7ff28699bb72c3fe25416c06a314f00b3 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 19 Feb 2025 07:56:20 +0000 Subject: [PATCH 1/6] Get working with changes in swift-jobs --- Package.swift | 2 +- Sources/JobsPostgres/PostgresJobsQueue.swift | 79 +++++++++++++------- 2 files changed, 51 insertions(+), 30 deletions(-) diff --git a/Package.swift b/Package.swift index 7f6a392..3e6fb2b 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( ], dependencies: [ // TODO: use a released version of swift-jobs - .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), + .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "job-driver-parameters"), .package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"), .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"), ], diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index d695035..68a073a 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -99,6 +99,7 @@ public final class PostgresJobQueue: JobQueueDriver { public init(client: PostgresClient, migrations: DatabaseMigrations, configuration: Configuration = .init(), logger: Logger) async { self.client = client self.configuration = configuration + self.jobRegistry = .init() self.logger = logger self.isStopped = .init(false) self.migrations = migrations @@ -162,15 +163,23 @@ public final class PostgresJobQueue: JobQueueDriver { } } + /// Register job + /// - Parameters: + /// - job: Job Definition + public func registerJob(_ job: JobDefinition) { + self.jobRegistry.registerJob(job) + } + /// Push Job onto queue /// - Returns: Identifier of queued job - @discardableResult public func push(_ buffer: ByteBuffer, options: JobOptions) async throws -> JobID { + @discardableResult public func push(_ jobRequest: JobRequest, options: JobOptions) async throws -> JobID { + let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) + let jobID = JobID() try await self.client.withTransaction(logger: self.logger) { connection in - let queuedJob = QueuedJob(id: .init(), jobBuffer: buffer) - try await self.add(queuedJob, connection: connection) - try await self.addToQueue(jobId: queuedJob.id, connection: connection, delayUntil: options.delayUntil) - return queuedJob.id + try await self.add(jobID: jobID, jobBuffer: buffer, connection: connection) + try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil) } + return jobID } /// Retry a job @@ -184,13 +193,13 @@ public final class PostgresJobQueue: JobQueueDriver { } /// This is called to say job has finished processing and it can be deleted - public func finished(jobId: JobID) async throws { - try await self.delete(jobId: jobId) + public func finished(jobID: JobID) async throws { + try await self.delete(jobID: jobID) } /// This is called to say job has failed to run and should be put aside - public func failed(jobId: JobID, error: Error) async throws { - try await self.setStatus(jobId: jobId, status: .failed) + public func failed(jobID: JobID, error: Error) async throws { + try await self.setStatus(jobID: jobID, status: .failed) } /// stop serving jobs @@ -223,9 +232,9 @@ public final class PostgresJobQueue: JobQueueDriver { ) } - func popFirst() async throws -> QueuedJob? { + func popFirst() async throws -> JobQueueResult? { do { - let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result?, Error> in + let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result?, Error> in while true { try Task.checkCancellation() @@ -248,24 +257,33 @@ public final class PostgresJobQueue: JobQueueDriver { logger: self.logger ) // return nil if nothing in queue - guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else { + guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else { return Result.success(nil) } // select job from job table let stream2 = try await connection.query( - "SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)", + "SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)", logger: self.logger ) do { - try await self.setStatus(jobId: jobId, status: .processing, connection: connection) + try await self.setStatus(jobID: jobID, status: .processing, connection: connection) // if failed to find a job in the job table try getting another index guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else { continue } - return Result.success(QueuedJob(id: jobId, jobBuffer: buffer)) + do { + let jobInstance = try self.jobRegistry.decode(buffer) + return Result.success(.init(id: jobID, result: .success(jobInstance))) + } catch let error as JobQueueError { + return Result.success(.init(id: jobID, result: .failure(error))) + } catch { + return Result.success( + .init(id: jobID, result: .failure(JobQueueError(code: .unrecognised, jobName: nil, details: "\(error)"))) + ) + } } catch { - try await self.setStatus(jobId: jobId, status: .failed, connection: connection) + try await self.setStatus(jobID: jobID, status: .failed, connection: connection) return Result.failure( JobQueueError( code: .decodeJobFailed, @@ -296,11 +314,11 @@ public final class PostgresJobQueue: JobQueueDriver { } } - func add(_ job: QueuedJob, connection: PostgresConnection) async throws { + func add(jobID: JobID, jobBuffer: ByteBuffer, connection: PostgresConnection) async throws { try await connection.query( """ INSERT INTO _hb_pg_jobs (id, job, status) - VALUES (\(job.id), \(job.jobBuffer), \(Status.pending)) + VALUES (\(jobID), \(jobBuffer), \(Status.pending)) """, logger: self.logger ) @@ -319,18 +337,19 @@ public final class PostgresJobQueue: JobQueueDriver { ) } - func delete(jobId: JobID) async throws { + func delete(jobID: JobID) async throws { try await self.client.query( - "DELETE FROM _hb_pg_jobs WHERE id = \(jobId)", + "DELETE FROM _hb_pg_jobs WHERE id = \(jobID)", logger: self.logger ) } - func addToQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date) async throws { + func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws { + // TODO: assign Date.now in swift-jobs options? try await connection.query( """ INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until) - VALUES (\(jobId), \(Date.now), \(delayUntil)) + VALUES (\(jobID), \(Date.now), \(delayUntil)) -- We have found an existing job with the same id, SKIP this INSERT ON CONFLICT (job_id) DO NOTHING """, @@ -338,16 +357,16 @@ public final class PostgresJobQueue: JobQueueDriver { ) } - func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws { + func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws { try await connection.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", + "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)", logger: self.logger ) } - func setStatus(jobId: JobID, status: Status) async throws { + func setStatus(jobID: JobID, status: Status) async throws { try await self.client.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", + "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)", logger: self.logger ) } @@ -375,20 +394,22 @@ public final class PostgresJobQueue: JobQueueDriver { case .rerun: let jobs = try await getJobs(withStatus: status) self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") - for jobId in jobs { - try await self.addToQueue(jobId: jobId, connection: connection, delayUntil: Date.now) + for jobID in jobs { + try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: Date.now) } case .doNothing: break } } + + let jobRegistry: JobRegistry } /// extend PostgresJobQueue to conform to AsyncSequence extension PostgresJobQueue { public struct AsyncIterator: AsyncIteratorProtocol { - public typealias Element = QueuedJob + public typealias Element = JobQueueResult let queue: PostgresJobQueue From 770e2e2f54f42a09208b4faf2f336edf56fcd99b Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 19 Feb 2025 14:28:04 +0000 Subject: [PATCH 2/6] Reorder decode and setStatus also print error on failing to find a job --- Sources/JobsPostgres/PostgresJobsQueue.swift | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 68a073a..3269019 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -267,11 +267,17 @@ public final class PostgresJobQueue: JobQueueDriver { ) do { - try await self.setStatus(jobID: jobID, status: .processing, connection: connection) - // if failed to find a job in the job table try getting another index guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else { + logger.error( + "Failed to job with id", + metadata: [ + "JobID": "\(jobID)" + ] + ) + // if failed to find a job in the job table try getting another index continue } + try await self.setStatus(jobID: jobID, status: .processing, connection: connection) do { let jobInstance = try self.jobRegistry.decode(buffer) return Result.success(.init(id: jobID, result: .success(jobInstance))) From fa61e36cc186b4368f21f0c9c20b84df21015476 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 20 Feb 2025 11:59:06 +0000 Subject: [PATCH 3/6] Comment, don't return error for unrecognised decode error --- Sources/JobsPostgres/PostgresJobsQueue.swift | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 3269019..6731eaa 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -234,6 +234,8 @@ public final class PostgresJobQueue: JobQueueDriver { func popFirst() async throws -> JobQueueResult? { do { + // The withTransaction closure returns a Result?, Error> because + // we want to be able to exit the closure without cancelling the transaction let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result?, Error> in while true { try Task.checkCancellation() @@ -269,7 +271,7 @@ public final class PostgresJobQueue: JobQueueDriver { do { guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else { logger.error( - "Failed to job with id", + "Failed to find job with id", metadata: [ "JobID": "\(jobID)" ] @@ -283,10 +285,6 @@ public final class PostgresJobQueue: JobQueueDriver { return Result.success(.init(id: jobID, result: .success(jobInstance))) } catch let error as JobQueueError { return Result.success(.init(id: jobID, result: .failure(error))) - } catch { - return Result.success( - .init(id: jobID, result: .failure(JobQueueError(code: .unrecognised, jobName: nil, details: "\(error)"))) - ) } } catch { try await self.setStatus(jobID: jobID, status: .failed, connection: connection) From 9c4ae5d7b4a5b3009116a62c9612138bd83c80c4 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 20 Feb 2025 12:14:49 +0000 Subject: [PATCH 4/6] Separate out job decode from database transaction --- Sources/JobsPostgres/PostgresJobsQueue.swift | 110 +++++++++---------- 1 file changed, 50 insertions(+), 60 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 6731eaa..ede74e8 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -234,71 +234,61 @@ public final class PostgresJobQueue: JobQueueDriver { func popFirst() async throws -> JobQueueResult? { do { - // The withTransaction closure returns a Result?, Error> because + // The withTransaction closure returns a Result<(ByteBuffer, JobID)?, Error> because // we want to be able to exit the closure without cancelling the transaction - let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result?, Error> in - while true { - try Task.checkCancellation() - - let stream = try await connection.query( - """ - WITH next_job AS ( - SELECT - job_id - FROM _hb_pg_job_queue - WHERE delayed_until <= NOW() - ORDER BY createdAt, delayed_until ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - DELETE FROM - _hb_pg_job_queue - WHERE job_id = (SELECT job_id FROM next_job) - RETURNING job_id - """, - logger: self.logger - ) - // return nil if nothing in queue - guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else { - return Result.success(nil) - } - // select job from job table - let stream2 = try await connection.query( - "SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)", - logger: self.logger + let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<(ByteBuffer, JobID)?, Error> in + try Task.checkCancellation() + + let stream = try await connection.query( + """ + WITH next_job AS ( + SELECT + job_id + FROM _hb_pg_job_queue + WHERE delayed_until <= NOW() + ORDER BY createdAt, delayed_until ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 ) + DELETE FROM + _hb_pg_job_queue + WHERE job_id = (SELECT job_id FROM next_job) + RETURNING job_id + """, + logger: self.logger + ) + // return nil if nothing in queue + guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else { + return Result.success(nil) + } + // set job status to processing + try await self.setStatus(jobID: jobID, status: .processing, connection: connection) - do { - guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else { - logger.error( - "Failed to find job with id", - metadata: [ - "JobID": "\(jobID)" - ] - ) - // if failed to find a job in the job table try getting another index - continue - } - try await self.setStatus(jobID: jobID, status: .processing, connection: connection) - do { - let jobInstance = try self.jobRegistry.decode(buffer) - return Result.success(.init(id: jobID, result: .success(jobInstance))) - } catch let error as JobQueueError { - return Result.success(.init(id: jobID, result: .failure(error))) - } - } catch { - try await self.setStatus(jobID: jobID, status: .failed, connection: connection) - return Result.failure( - JobQueueError( - code: .decodeJobFailed, - jobName: nil, - details: "\(String(reflecting: error))" - ) - ) - } + // select job from job table + let stream2 = try await connection.query( + "SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)", + logger: self.logger + ) + guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else { + logger.error( + "Failed to find job with id", + metadata: [ + "JobID": "\(jobID)" + ] + ) + // if failed to find the job in the job table return nil + return .success(nil) } + return .success((buffer, jobID)) + + } + guard let (buffer, jobID) = try result.get() else { return nil } + do { + let jobInstance = try self.jobRegistry.decode(buffer) + return JobQueueResult(id: jobID, result: .success(jobInstance)) + } catch let error as JobQueueError { + return JobQueueResult(id: jobID, result: .failure(error)) } - return try result.get() } catch let error as PSQLError { logger.error( "Failed to get job from queue", From c116ab724c373dbd24b1259418c4e613a9c28267 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 21 Feb 2025 14:46:52 +0000 Subject: [PATCH 5/6] Update for retry --- Sources/JobsPostgres/PostgresJobsQueue.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index ede74e8..524c3e4 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -184,12 +184,12 @@ public final class PostgresJobQueue: JobQueueDriver { /// Retry a job /// - Returns: Bool - @discardableResult public func retry(_ id: JobID, buffer: ByteBuffer, options: JobOptions) async throws -> Bool { + public func retry(_ id: JobID, jobRequest: JobRequest, options: JobOptions) async throws { + let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in try await self.updateJob(id: id, buffer: buffer, connection: connection) - try await self.addToQueue(jobId: id, connection: connection, delayUntil: options.delayUntil) + try await self.addToQueue(jobID: id, connection: connection, delayUntil: options.delayUntil) } - return true } /// This is called to say job has finished processing and it can be deleted From 033086994b4b67b42ce116dd8db0c23a95326d8f Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sat, 22 Feb 2025 15:57:11 +0000 Subject: [PATCH 6/6] Use swift-jobs main --- Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Package.swift b/Package.swift index 3e6fb2b..7f6a392 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( ], dependencies: [ // TODO: use a released version of swift-jobs - .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "job-driver-parameters"), + .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), .package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"), .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"), ],