Skip to content

Commit 59f992e

Browse files
authored
Decode job straight from row (#18)
1 parent 6d74f49 commit 59f992e

File tree

3 files changed

+123
-21
lines changed

3 files changed

+123
-21
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Hummingbird server framework project
4+
//
5+
// Copyright (c) 2024-2025 the Hummingbird authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Jobs
16+
import PostgresNIO
17+
18+
extension AnyDecodableJob {
19+
public static var psqlType: PostgresDataType {
20+
.bytea
21+
}
22+
public static var psqlFormat: PostgresFormat {
23+
.binary
24+
}
25+
26+
public init<JSONDecoder: PostgresJSONDecoder>(
27+
from buffer: inout ByteBuffer,
28+
type: PostgresDataType,
29+
format: PostgresFormat,
30+
context: PostgresDecodingContext<JSONDecoder>
31+
) throws {
32+
switch (format, type) {
33+
case (.binary, .bytea):
34+
self = try context.jsonDecoder.decode(Self.self, from: buffer)
35+
default:
36+
throw PostgresDecodingError.Code.typeMismatch
37+
}
38+
}
39+
}
40+
41+
#if hasAttribute(retroactive)
42+
extension AnyDecodableJob: @retroactive PostgresDecodable {}
43+
#else
44+
extension AnyDecodableJob: PostgresDecodable {}
45+
#endif
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Hummingbird server framework project
4+
//
5+
// Copyright (c) 2024-2025 the Hummingbird authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Jobs
16+
import PostgresNIO
17+
18+
extension JobRequest {
19+
public static var psqlType: PostgresDataType {
20+
.bytea
21+
}
22+
public static var psqlFormat: PostgresFormat {
23+
.binary
24+
}
25+
26+
@inlinable
27+
public func encode<JSONEncoder: PostgresJSONEncoder>(
28+
into byteBuffer: inout ByteBuffer,
29+
context: PostgresEncodingContext<JSONEncoder>
30+
) throws {
31+
try context.jsonEncoder.encode(self, into: &byteBuffer)
32+
}
33+
}
34+
35+
#if hasAttribute(retroactive)
36+
extension JobRequest: @retroactive PostgresEncodable where Parameters: Encodable {}
37+
#else
38+
extension JobRequest: PostgresEncodable where Parameters: Encodable {}
39+
#endif

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Hummingbird server framework project
44
//
5-
// Copyright (c) 2024 the Hummingbird authors
5+
// Copyright (c) 2024-2025 the Hummingbird authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -173,10 +173,9 @@ public final class PostgresJobQueue: JobQueueDriver {
173173
/// Push Job onto queue
174174
/// - Returns: Identifier of queued job
175175
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
176-
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
177176
let jobID = JobID()
178177
try await self.client.withTransaction(logger: self.logger) { connection in
179-
try await self.add(jobID: jobID, jobBuffer: buffer, connection: connection)
178+
try await self.add(jobID: jobID, jobRequest: jobRequest, connection: connection)
180179
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil)
181180
}
182181
return jobID
@@ -233,10 +232,15 @@ public final class PostgresJobQueue: JobQueueDriver {
233232
}
234233

235234
func popFirst() async throws -> JobQueueResult<JobID>? {
235+
enum PopFirstResult {
236+
case nothing
237+
case result(Result<PostgresRow, Error>, jobID: JobID)
238+
}
236239
do {
237240
// The withTransaction closure returns a Result<(ByteBuffer, JobID)?, Error> because
238241
// we want to be able to exit the closure without cancelling the transaction
239-
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<(ByteBuffer, JobID)?, Error> in
242+
let popFirstResult = try await self.client.withTransaction(logger: self.logger) {
243+
connection -> PopFirstResult in
240244
try Task.checkCancellation()
241245

242246
let stream = try await connection.query(
@@ -259,7 +263,7 @@ public final class PostgresJobQueue: JobQueueDriver {
259263
)
260264
// return nil if nothing in queue
261265
guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
262-
return Result.success(nil)
266+
return .nothing
263267
}
264268
// set job status to processing
265269
try await self.setStatus(jobID: jobID, status: .processing, connection: connection)
@@ -269,36 +273,41 @@ public final class PostgresJobQueue: JobQueueDriver {
269273
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
270274
logger: self.logger
271275
)
272-
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
273-
logger.error(
276+
guard let row = try await stream2.first(where: { _ in true }) else {
277+
logger.info(
274278
"Failed to find job with id",
275279
metadata: [
276280
"JobID": "\(jobID)"
277281
]
278282
)
279-
// if failed to find the job in the job table return nil
280-
return .success(nil)
283+
// if failed to find the job in the job table return error
284+
return .result(.failure(JobQueueError(code: .unrecognisedJobId, jobName: nil)), jobID: jobID)
281285
}
282-
return .success((buffer, jobID))
283-
286+
return .result(.success(row), jobID: jobID)
284287
}
285-
guard let (buffer, jobID) = try result.get() else { return nil }
286-
do {
287-
let jobInstance = try self.jobRegistry.decode(buffer)
288-
return JobQueueResult(id: jobID, result: .success(jobInstance))
289-
} catch let error as JobQueueError {
290-
return JobQueueResult(id: jobID, result: .failure(error))
288+
289+
switch popFirstResult {
290+
case .nothing:
291+
return nil
292+
case .result(let result, let jobID):
293+
do {
294+
let row = try result.get()
295+
let jobInstance = try row.decode(AnyDecodableJob.self, context: .withJobRegistry(self.jobRegistry)).job
296+
return JobQueueResult(id: jobID, result: .success(jobInstance))
297+
} catch let error as JobQueueError {
298+
return JobQueueResult(id: jobID, result: .failure(error))
299+
}
291300
}
292301
} catch let error as PSQLError {
293-
logger.error(
302+
logger.info(
294303
"Failed to get job from queue",
295304
metadata: [
296305
"error": "\(String(reflecting: error))"
297306
]
298307
)
299308
throw error
300309
} catch let error as JobQueueError {
301-
logger.error(
310+
logger.info(
302311
"Job failed",
303312
metadata: [
304313
"error": "\(String(reflecting: error))"
@@ -308,11 +317,11 @@ public final class PostgresJobQueue: JobQueueDriver {
308317
}
309318
}
310319

311-
func add(jobID: JobID, jobBuffer: ByteBuffer, connection: PostgresConnection) async throws {
320+
func add<Parameters>(jobID: JobID, jobRequest: JobRequest<Parameters>, connection: PostgresConnection) async throws {
312321
try await connection.query(
313322
"""
314323
INSERT INTO _hb_pg_jobs (id, job, status)
315-
VALUES (\(jobID), \(jobBuffer), \(Status.pending))
324+
VALUES (\(jobID), \(jobRequest), \(Status.pending))
316325
""",
317326
logger: self.logger
318327
)
@@ -443,3 +452,12 @@ extension JobQueueDriver where Self == PostgresJobQueue {
443452
await Self(client: client, migrations: migrations, configuration: configuration, logger: logger)
444453
}
445454
}
455+
456+
extension PostgresDecodingContext where JSONDecoder == Foundation.JSONDecoder {
457+
/// A ``PostgresDecodingContext`` that uses a Foundation `JSONDecoder` with job registry attached as userInfo.
458+
public static func withJobRegistry(_ jobRegistry: JobRegistry) -> PostgresDecodingContext {
459+
let jsonDecoder = JSONDecoder()
460+
jsonDecoder.userInfo[._jobConfiguration] = jobRegistry
461+
return PostgresDecodingContext(jsonDecoder: jsonDecoder)
462+
}
463+
}

0 commit comments

Comments
 (0)