Skip to content

Commit e6386ba

Browse files
committed
Decode job from Postgres JSON
1 parent c116ab7 commit e6386ba

File tree

3 files changed

+109
-18
lines changed

3 files changed

+109
-18
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Hummingbird server framework project
4+
//
5+
// Copyright (c) 2024-2025 the Hummingbird authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Jobs
16+
import PostgresNIO
17+
18+
extension AnyDecodableJob: @retroactive PostgresDecodable {
19+
public static var psqlType: PostgresDataType {
20+
.bytea
21+
}
22+
public static var psqlFormat: PostgresFormat {
23+
.binary
24+
}
25+
26+
public init<JSONDecoder: PostgresJSONDecoder>(
27+
from buffer: inout ByteBuffer,
28+
type: PostgresDataType,
29+
format: PostgresFormat,
30+
context: PostgresDecodingContext<JSONDecoder>
31+
) throws {
32+
switch (format, type) {
33+
case (.binary, .bytea):
34+
self = try context.jsonDecoder.decode(Self.self, from: buffer)
35+
default:
36+
throw PostgresDecodingError.Code.typeMismatch
37+
}
38+
}
39+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Hummingbird server framework project
4+
//
5+
// Copyright (c) 2024-2025 the Hummingbird authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Jobs
16+
import PostgresNIO
17+
18+
extension JobRequest: @retroactive PostgresEncodable where Parameters: Encodable {
19+
public static var psqlType: PostgresDataType {
20+
.bytea
21+
}
22+
public static var psqlFormat: PostgresFormat {
23+
.binary
24+
}
25+
26+
@inlinable
27+
public func encode<JSONEncoder: PostgresJSONEncoder>(
28+
into byteBuffer: inout ByteBuffer,
29+
context: PostgresEncodingContext<JSONEncoder>
30+
) throws {
31+
try context.jsonEncoder.encode(self, into: &byteBuffer)
32+
}
33+
}

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Hummingbird server framework project
44
//
5-
// Copyright (c) 2024 the Hummingbird authors
5+
// Copyright (c) 2024-2025 the Hummingbird authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -173,10 +173,10 @@ public final class PostgresJobQueue: JobQueueDriver {
173173
/// Push Job onto queue
174174
/// - Returns: Identifier of queued job
175175
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
176-
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
176+
//.let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
177177
let jobID = JobID()
178178
try await self.client.withTransaction(logger: self.logger) { connection in
179-
try await self.add(jobID: jobID, jobBuffer: buffer, connection: connection)
179+
try await self.add(jobID: jobID, jobRequest: jobRequest, connection: connection)
180180
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil)
181181
}
182182
return jobID
@@ -233,10 +233,15 @@ public final class PostgresJobQueue: JobQueueDriver {
233233
}
234234

235235
func popFirst() async throws -> JobQueueResult<JobID>? {
236+
enum PopFirstResult {
237+
case nothing
238+
case result(Result<PostgresRow, Error>, jobID: JobID)
239+
}
236240
do {
237241
// The withTransaction closure returns a Result<(ByteBuffer, JobID)?, Error> because
238242
// we want to be able to exit the closure without cancelling the transaction
239-
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<(ByteBuffer, JobID)?, Error> in
243+
let popFirstResult = try await self.client.withTransaction(logger: self.logger) {
244+
connection -> PopFirstResult in
240245
try Task.checkCancellation()
241246

242247
let stream = try await connection.query(
@@ -259,7 +264,7 @@ public final class PostgresJobQueue: JobQueueDriver {
259264
)
260265
// return nil if nothing in queue
261266
guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
262-
return Result.success(nil)
267+
return .nothing
263268
}
264269
// set job status to processing
265270
try await self.setStatus(jobID: jobID, status: .processing, connection: connection)
@@ -269,25 +274,30 @@ public final class PostgresJobQueue: JobQueueDriver {
269274
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
270275
logger: self.logger
271276
)
272-
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
277+
guard let row = try await stream2.first(where: { _ in true }) else {
273278
logger.error(
274279
"Failed to find job with id",
275280
metadata: [
276281
"JobID": "\(jobID)"
277282
]
278283
)
279-
// if failed to find the job in the job table return nil
280-
return .success(nil)
284+
// if failed to find the job in the job table return error
285+
return .result(.failure(JobQueueError(code: .unrecognisedJobId, jobName: nil)), jobID: jobID)
281286
}
282-
return .success((buffer, jobID))
283-
287+
return .result(.success(row), jobID: jobID)
284288
}
285-
guard let (buffer, jobID) = try result.get() else { return nil }
286-
do {
287-
let jobInstance = try self.jobRegistry.decode(buffer)
288-
return JobQueueResult(id: jobID, result: .success(jobInstance))
289-
} catch let error as JobQueueError {
290-
return JobQueueResult(id: jobID, result: .failure(error))
289+
290+
switch popFirstResult {
291+
case .nothing:
292+
return nil
293+
case .result(let result, let jobID):
294+
do {
295+
let row = try result.get()
296+
let jobInstance = try row.decode(AnyDecodableJob.self, context: .withJobRegistry(self.jobRegistry)).job
297+
return JobQueueResult(id: jobID, result: .success(jobInstance))
298+
} catch let error as JobQueueError {
299+
return JobQueueResult(id: jobID, result: .failure(error))
300+
}
291301
}
292302
} catch let error as PSQLError {
293303
logger.error(
@@ -308,11 +318,11 @@ public final class PostgresJobQueue: JobQueueDriver {
308318
}
309319
}
310320

311-
func add(jobID: JobID, jobBuffer: ByteBuffer, connection: PostgresConnection) async throws {
321+
func add<Parameters>(jobID: JobID, jobRequest: JobRequest<Parameters>, connection: PostgresConnection) async throws {
312322
try await connection.query(
313323
"""
314324
INSERT INTO _hb_pg_jobs (id, job, status)
315-
VALUES (\(jobID), \(jobBuffer), \(Status.pending))
325+
VALUES (\(jobID), \(jobRequest), \(Status.pending))
316326
""",
317327
logger: self.logger
318328
)
@@ -443,3 +453,12 @@ extension JobQueueDriver where Self == PostgresJobQueue {
443453
await Self(client: client, migrations: migrations, configuration: configuration, logger: logger)
444454
}
445455
}
456+
457+
extension PostgresDecodingContext where JSONDecoder == Foundation.JSONDecoder {
458+
/// A ``PostgresDecodingContext`` that uses a Foundation `JSONDecoder` with job registry attached as userInfo.
459+
public static func withJobRegistry(_ jobRegistry: JobRegistry) -> PostgresDecodingContext {
460+
let jsonDecoder = JSONDecoder()
461+
jsonDecoder.userInfo[.configuration] = jobRegistry
462+
return PostgresDecodingContext(jsonDecoder: jsonDecoder)
463+
}
464+
}

0 commit comments

Comments
 (0)