diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index d695035..524c3e4 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -99,6 +99,7 @@ public final class PostgresJobQueue: JobQueueDriver { public init(client: PostgresClient, migrations: DatabaseMigrations, configuration: Configuration = .init(), logger: Logger) async { self.client = client self.configuration = configuration + self.jobRegistry = .init() self.logger = logger self.isStopped = .init(false) self.migrations = migrations @@ -162,35 +163,43 @@ public final class PostgresJobQueue: JobQueueDriver { } } + /// Register job + /// - Parameters: + /// - job: Job Definition + public func registerJob(_ job: JobDefinition) { + self.jobRegistry.registerJob(job) + } + /// Push Job onto queue /// - Returns: Identifier of queued job - @discardableResult public func push(_ buffer: ByteBuffer, options: JobOptions) async throws -> JobID { + @discardableResult public func push(_ jobRequest: JobRequest, options: JobOptions) async throws -> JobID { + let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) + let jobID = JobID() try await self.client.withTransaction(logger: self.logger) { connection in - let queuedJob = QueuedJob(id: .init(), jobBuffer: buffer) - try await self.add(queuedJob, connection: connection) - try await self.addToQueue(jobId: queuedJob.id, connection: connection, delayUntil: options.delayUntil) - return queuedJob.id + try await self.add(jobID: jobID, jobBuffer: buffer, connection: connection) + try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil) } + return jobID } /// Retry a job /// - Returns: Bool - @discardableResult public func retry(_ id: JobID, buffer: ByteBuffer, options: JobOptions) async throws -> Bool { + public func retry(_ id: JobID, jobRequest: JobRequest, options: JobOptions) async throws { + let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in try await self.updateJob(id: id, buffer: buffer, connection: connection) - try await self.addToQueue(jobId: id, connection: connection, delayUntil: options.delayUntil) + try await self.addToQueue(jobID: id, connection: connection, delayUntil: options.delayUntil) } - return true } /// This is called to say job has finished processing and it can be deleted - public func finished(jobId: JobID) async throws { - try await self.delete(jobId: jobId) + public func finished(jobID: JobID) async throws { + try await self.delete(jobID: jobID) } /// This is called to say job has failed to run and should be put aside - public func failed(jobId: JobID, error: Error) async throws { - try await self.setStatus(jobId: jobId, status: .failed) + public func failed(jobID: JobID, error: Error) async throws { + try await self.setStatus(jobID: jobID, status: .failed) } /// stop serving jobs @@ -223,60 +232,63 @@ public final class PostgresJobQueue: JobQueueDriver { ) } - func popFirst() async throws -> QueuedJob? { + func popFirst() async throws -> JobQueueResult? { do { - let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result?, Error> in - while true { - try Task.checkCancellation() - - let stream = try await connection.query( - """ - WITH next_job AS ( - SELECT - job_id - FROM _hb_pg_job_queue - WHERE delayed_until <= NOW() - ORDER BY createdAt, delayed_until ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - DELETE FROM - _hb_pg_job_queue - WHERE job_id = (SELECT job_id FROM next_job) - RETURNING job_id - """, - logger: self.logger - ) - // return nil if nothing in queue - guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else { - return Result.success(nil) - } - // select job from job table - let stream2 = try await connection.query( - "SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)", - logger: self.logger + // The withTransaction closure returns a Result<(ByteBuffer, JobID)?, Error> because + // we want to be able to exit the closure without cancelling the transaction + let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<(ByteBuffer, JobID)?, Error> in + try Task.checkCancellation() + + let stream = try await connection.query( + """ + WITH next_job AS ( + SELECT + job_id + FROM _hb_pg_job_queue + WHERE delayed_until <= NOW() + ORDER BY createdAt, delayed_until ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 ) + DELETE FROM + _hb_pg_job_queue + WHERE job_id = (SELECT job_id FROM next_job) + RETURNING job_id + """, + logger: self.logger + ) + // return nil if nothing in queue + guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else { + return Result.success(nil) + } + // set job status to processing + try await self.setStatus(jobID: jobID, status: .processing, connection: connection) - do { - try await self.setStatus(jobId: jobId, status: .processing, connection: connection) - // if failed to find a job in the job table try getting another index - guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else { - continue - } - return Result.success(QueuedJob(id: jobId, jobBuffer: buffer)) - } catch { - try await self.setStatus(jobId: jobId, status: .failed, connection: connection) - return Result.failure( - JobQueueError( - code: .decodeJobFailed, - jobName: nil, - details: "\(String(reflecting: error))" - ) - ) - } + // select job from job table + let stream2 = try await connection.query( + "SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)", + logger: self.logger + ) + guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else { + logger.error( + "Failed to find job with id", + metadata: [ + "JobID": "\(jobID)" + ] + ) + // if failed to find the job in the job table return nil + return .success(nil) } + return .success((buffer, jobID)) + + } + guard let (buffer, jobID) = try result.get() else { return nil } + do { + let jobInstance = try self.jobRegistry.decode(buffer) + return JobQueueResult(id: jobID, result: .success(jobInstance)) + } catch let error as JobQueueError { + return JobQueueResult(id: jobID, result: .failure(error)) } - return try result.get() } catch let error as PSQLError { logger.error( "Failed to get job from queue", @@ -296,11 +308,11 @@ public final class PostgresJobQueue: JobQueueDriver { } } - func add(_ job: QueuedJob, connection: PostgresConnection) async throws { + func add(jobID: JobID, jobBuffer: ByteBuffer, connection: PostgresConnection) async throws { try await connection.query( """ INSERT INTO _hb_pg_jobs (id, job, status) - VALUES (\(job.id), \(job.jobBuffer), \(Status.pending)) + VALUES (\(jobID), \(jobBuffer), \(Status.pending)) """, logger: self.logger ) @@ -319,18 +331,19 @@ public final class PostgresJobQueue: JobQueueDriver { ) } - func delete(jobId: JobID) async throws { + func delete(jobID: JobID) async throws { try await self.client.query( - "DELETE FROM _hb_pg_jobs WHERE id = \(jobId)", + "DELETE FROM _hb_pg_jobs WHERE id = \(jobID)", logger: self.logger ) } - func addToQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date) async throws { + func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws { + // TODO: assign Date.now in swift-jobs options? try await connection.query( """ INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until) - VALUES (\(jobId), \(Date.now), \(delayUntil)) + VALUES (\(jobID), \(Date.now), \(delayUntil)) -- We have found an existing job with the same id, SKIP this INSERT ON CONFLICT (job_id) DO NOTHING """, @@ -338,16 +351,16 @@ public final class PostgresJobQueue: JobQueueDriver { ) } - func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws { + func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws { try await connection.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", + "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)", logger: self.logger ) } - func setStatus(jobId: JobID, status: Status) async throws { + func setStatus(jobID: JobID, status: Status) async throws { try await self.client.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", + "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)", logger: self.logger ) } @@ -375,20 +388,22 @@ public final class PostgresJobQueue: JobQueueDriver { case .rerun: let jobs = try await getJobs(withStatus: status) self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") - for jobId in jobs { - try await self.addToQueue(jobId: jobId, connection: connection, delayUntil: Date.now) + for jobID in jobs { + try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: Date.now) } case .doNothing: break } } + + let jobRegistry: JobRegistry } /// extend PostgresJobQueue to conform to AsyncSequence extension PostgresJobQueue { public struct AsyncIterator: AsyncIteratorProtocol { - public typealias Element = QueuedJob + public typealias Element = JobQueueResult let queue: PostgresJobQueue