diff --git a/Sources/JobsPostgres/AnyDecodableJob+postgres.swift b/Sources/JobsPostgres/AnyDecodableJob+postgres.swift new file mode 100644 index 0000000..b25e5f2 --- /dev/null +++ b/Sources/JobsPostgres/AnyDecodableJob+postgres.swift @@ -0,0 +1,45 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024-2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Jobs +import PostgresNIO + +extension AnyDecodableJob { + public static var psqlType: PostgresDataType { + .bytea + } + public static var psqlFormat: PostgresFormat { + .binary + } + + public init( + from buffer: inout ByteBuffer, + type: PostgresDataType, + format: PostgresFormat, + context: PostgresDecodingContext + ) throws { + switch (format, type) { + case (.binary, .bytea): + self = try context.jsonDecoder.decode(Self.self, from: buffer) + default: + throw PostgresDecodingError.Code.typeMismatch + } + } +} + +#if hasAttribute(retroactive) +extension AnyDecodableJob: @retroactive PostgresDecodable {} +#else +extension AnyDecodableJob: PostgresDecodable {} +#endif diff --git a/Sources/JobsPostgres/JobRequest+postgres.swift b/Sources/JobsPostgres/JobRequest+postgres.swift new file mode 100644 index 0000000..2b1a608 --- /dev/null +++ b/Sources/JobsPostgres/JobRequest+postgres.swift @@ -0,0 +1,39 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024-2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Jobs +import PostgresNIO + +extension JobRequest { + public static var psqlType: PostgresDataType { + .bytea + } + public static var psqlFormat: PostgresFormat { + .binary + } + + @inlinable + public func encode( + into byteBuffer: inout ByteBuffer, + context: PostgresEncodingContext + ) throws { + try context.jsonEncoder.encode(self, into: &byteBuffer) + } +} + +#if hasAttribute(retroactive) +extension JobRequest: @retroactive PostgresEncodable where Parameters: Encodable {} +#else +extension JobRequest: PostgresEncodable where Parameters: Encodable {} +#endif diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 524c3e4..869f06a 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2024 the Hummingbird authors +// Copyright (c) 2024-2025 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -173,10 +173,9 @@ public final class PostgresJobQueue: JobQueueDriver { /// Push Job onto queue /// - Returns: Identifier of queued job @discardableResult public func push(_ jobRequest: JobRequest, options: JobOptions) async throws -> JobID { - let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) let jobID = JobID() try await self.client.withTransaction(logger: self.logger) { connection in - try await self.add(jobID: jobID, jobBuffer: buffer, connection: connection) + try await self.add(jobID: jobID, jobRequest: jobRequest, connection: connection) try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil) } return jobID @@ -233,10 +232,15 @@ public final class PostgresJobQueue: JobQueueDriver { } func popFirst() async throws -> JobQueueResult? { + enum PopFirstResult { + case nothing + case result(Result, jobID: JobID) + } do { // The withTransaction closure returns a Result<(ByteBuffer, JobID)?, Error> because // we want to be able to exit the closure without cancelling the transaction - let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<(ByteBuffer, JobID)?, Error> in + let popFirstResult = try await self.client.withTransaction(logger: self.logger) { + connection -> PopFirstResult in try Task.checkCancellation() let stream = try await connection.query( @@ -259,7 +263,7 @@ public final class PostgresJobQueue: JobQueueDriver { ) // return nil if nothing in queue guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else { - return Result.success(nil) + return .nothing } // set job status to processing try await self.setStatus(jobID: jobID, status: .processing, connection: connection) @@ -269,28 +273,33 @@ public final class PostgresJobQueue: JobQueueDriver { "SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)", logger: self.logger ) - guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else { - logger.error( + guard let row = try await stream2.first(where: { _ in true }) else { + logger.info( "Failed to find job with id", metadata: [ "JobID": "\(jobID)" ] ) - // if failed to find the job in the job table return nil - return .success(nil) + // if failed to find the job in the job table return error + return .result(.failure(JobQueueError(code: .unrecognisedJobId, jobName: nil)), jobID: jobID) } - return .success((buffer, jobID)) - + return .result(.success(row), jobID: jobID) } - guard let (buffer, jobID) = try result.get() else { return nil } - do { - let jobInstance = try self.jobRegistry.decode(buffer) - return JobQueueResult(id: jobID, result: .success(jobInstance)) - } catch let error as JobQueueError { - return JobQueueResult(id: jobID, result: .failure(error)) + + switch popFirstResult { + case .nothing: + return nil + case .result(let result, let jobID): + do { + let row = try result.get() + let jobInstance = try row.decode(AnyDecodableJob.self, context: .withJobRegistry(self.jobRegistry)).job + return JobQueueResult(id: jobID, result: .success(jobInstance)) + } catch let error as JobQueueError { + return JobQueueResult(id: jobID, result: .failure(error)) + } } } catch let error as PSQLError { - logger.error( + logger.info( "Failed to get job from queue", metadata: [ "error": "\(String(reflecting: error))" @@ -298,7 +307,7 @@ public final class PostgresJobQueue: JobQueueDriver { ) throw error } catch let error as JobQueueError { - logger.error( + logger.info( "Job failed", metadata: [ "error": "\(String(reflecting: error))" @@ -308,11 +317,11 @@ public final class PostgresJobQueue: JobQueueDriver { } } - func add(jobID: JobID, jobBuffer: ByteBuffer, connection: PostgresConnection) async throws { + func add(jobID: JobID, jobRequest: JobRequest, connection: PostgresConnection) async throws { try await connection.query( """ INSERT INTO _hb_pg_jobs (id, job, status) - VALUES (\(jobID), \(jobBuffer), \(Status.pending)) + VALUES (\(jobID), \(jobRequest), \(Status.pending)) """, logger: self.logger ) @@ -443,3 +452,12 @@ extension JobQueueDriver where Self == PostgresJobQueue { await Self(client: client, migrations: migrations, configuration: configuration, logger: logger) } } + +extension PostgresDecodingContext where JSONDecoder == Foundation.JSONDecoder { + /// A ``PostgresDecodingContext`` that uses a Foundation `JSONDecoder` with job registry attached as userInfo. + public static func withJobRegistry(_ jobRegistry: JobRegistry) -> PostgresDecodingContext { + let jsonDecoder = JSONDecoder() + jsonDecoder.userInfo[._jobConfiguration] = jobRegistry + return PostgresDecodingContext(jsonDecoder: jsonDecoder) + } +}