Skip to content

Decode job straight from row #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions Sources/JobsPostgres/AnyDecodableJob+postgres.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024-2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Jobs
import PostgresNIO

extension AnyDecodableJob {
public static var psqlType: PostgresDataType {
.bytea
}
public static var psqlFormat: PostgresFormat {
.binary
}

Check warning on line 24 in Sources/JobsPostgres/AnyDecodableJob+postgres.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/AnyDecodableJob+postgres.swift#L19-L24

Added lines #L19 - L24 were not covered by tests

public init<JSONDecoder: PostgresJSONDecoder>(
from buffer: inout ByteBuffer,
type: PostgresDataType,
format: PostgresFormat,
context: PostgresDecodingContext<JSONDecoder>
) throws {
switch (format, type) {
case (.binary, .bytea):
self = try context.jsonDecoder.decode(Self.self, from: buffer)
default:
throw PostgresDecodingError.Code.typeMismatch

Check warning on line 36 in Sources/JobsPostgres/AnyDecodableJob+postgres.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/AnyDecodableJob+postgres.swift#L36

Added line #L36 was not covered by tests
}
}
}

#if hasAttribute(retroactive)
extension AnyDecodableJob: @retroactive PostgresDecodable {}
#else
extension AnyDecodableJob: PostgresDecodable {}
#endif
39 changes: 39 additions & 0 deletions Sources/JobsPostgres/JobRequest+postgres.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024-2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Jobs
import PostgresNIO

extension JobRequest {
public static var psqlType: PostgresDataType {
.bytea
}
public static var psqlFormat: PostgresFormat {
.binary
}

@inlinable
public func encode<JSONEncoder: PostgresJSONEncoder>(
into byteBuffer: inout ByteBuffer,
context: PostgresEncodingContext<JSONEncoder>
) throws {
try context.jsonEncoder.encode(self, into: &byteBuffer)
}
}

#if hasAttribute(retroactive)
extension JobRequest: @retroactive PostgresEncodable where Parameters: Encodable {}
#else
extension JobRequest: PostgresEncodable where Parameters: Encodable {}
#endif
60 changes: 39 additions & 21 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Copyright (c) 2024-2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -173,10 +173,9 @@
/// Push Job onto queue
/// - Returns: Identifier of queued job
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
let jobID = JobID()
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.add(jobID: jobID, jobBuffer: buffer, connection: connection)
try await self.add(jobID: jobID, jobRequest: jobRequest, connection: connection)
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil)
}
return jobID
Expand Down Expand Up @@ -233,10 +232,15 @@
}

func popFirst() async throws -> JobQueueResult<JobID>? {
enum PopFirstResult {
case nothing
case result(Result<PostgresRow, Error>, jobID: JobID)
}
do {
// The withTransaction closure returns a Result<(ByteBuffer, JobID)?, Error> because
// we want to be able to exit the closure without cancelling the transaction
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<(ByteBuffer, JobID)?, Error> in
let popFirstResult = try await self.client.withTransaction(logger: self.logger) {
connection -> PopFirstResult in
try Task.checkCancellation()

let stream = try await connection.query(
Expand All @@ -259,7 +263,7 @@
)
// return nil if nothing in queue
guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
return Result.success(nil)
return .nothing
}
// set job status to processing
try await self.setStatus(jobID: jobID, status: .processing, connection: connection)
Expand All @@ -269,36 +273,41 @@
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
logger: self.logger
)
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
logger.error(
guard let row = try await stream2.first(where: { _ in true }) else {
logger.info(

Check warning on line 277 in Sources/JobsPostgres/PostgresJobsQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/PostgresJobsQueue.swift#L277

Added line #L277 was not covered by tests
"Failed to find job with id",
metadata: [
"JobID": "\(jobID)"
]
)
// if failed to find the job in the job table return nil
return .success(nil)
// if failed to find the job in the job table return error
return .result(.failure(JobQueueError(code: .unrecognisedJobId, jobName: nil)), jobID: jobID)

Check warning on line 284 in Sources/JobsPostgres/PostgresJobsQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/PostgresJobsQueue.swift#L283-L284

Added lines #L283 - L284 were not covered by tests
}
return .success((buffer, jobID))

return .result(.success(row), jobID: jobID)
}
guard let (buffer, jobID) = try result.get() else { return nil }
do {
let jobInstance = try self.jobRegistry.decode(buffer)
return JobQueueResult(id: jobID, result: .success(jobInstance))
} catch let error as JobQueueError {
return JobQueueResult(id: jobID, result: .failure(error))

switch popFirstResult {
case .nothing:
return nil
case .result(let result, let jobID):
do {
let row = try result.get()
let jobInstance = try row.decode(AnyDecodableJob.self, context: .withJobRegistry(self.jobRegistry)).job
return JobQueueResult(id: jobID, result: .success(jobInstance))
} catch let error as JobQueueError {
return JobQueueResult(id: jobID, result: .failure(error))
}
}
} catch let error as PSQLError {
logger.error(
logger.info(

Check warning on line 302 in Sources/JobsPostgres/PostgresJobsQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/PostgresJobsQueue.swift#L302

Added line #L302 was not covered by tests
"Failed to get job from queue",
metadata: [
"error": "\(String(reflecting: error))"
]
)
throw error
} catch let error as JobQueueError {
logger.error(
logger.info(

Check warning on line 310 in Sources/JobsPostgres/PostgresJobsQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/PostgresJobsQueue.swift#L310

Added line #L310 was not covered by tests
"Job failed",
metadata: [
"error": "\(String(reflecting: error))"
Expand All @@ -308,11 +317,11 @@
}
}

func add(jobID: JobID, jobBuffer: ByteBuffer, connection: PostgresConnection) async throws {
func add<Parameters>(jobID: JobID, jobRequest: JobRequest<Parameters>, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO _hb_pg_jobs (id, job, status)
VALUES (\(jobID), \(jobBuffer), \(Status.pending))
VALUES (\(jobID), \(jobRequest), \(Status.pending))
""",
logger: self.logger
)
Expand Down Expand Up @@ -443,3 +452,12 @@
await Self(client: client, migrations: migrations, configuration: configuration, logger: logger)
}
}

extension PostgresDecodingContext where JSONDecoder == Foundation.JSONDecoder {
/// A ``PostgresDecodingContext`` that uses a Foundation `JSONDecoder` with job registry attached as userInfo.
public static func withJobRegistry(_ jobRegistry: JobRegistry) -> PostgresDecodingContext {
let jsonDecoder = JSONDecoder()
jsonDecoder.userInfo[._jobConfiguration] = jobRegistry
return PostgresDecodingContext(jsonDecoder: jsonDecoder)
}
}
Loading