Skip to content

Commit 9c4ae5d

Browse files
committed
Separate out job decode from database transaction
1 parent fa61e36 commit 9c4ae5d

File tree

1 file changed

+50
-60
lines changed

1 file changed

+50
-60
lines changed

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 50 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -234,71 +234,61 @@ public final class PostgresJobQueue: JobQueueDriver {
234234

235235
func popFirst() async throws -> JobQueueResult<JobID>? {
236236
do {
237-
// The withTransaction closure returns a Result<JobQueueResult<JobID>?, Error> because
237+
// The withTransaction closure returns a Result<(ByteBuffer, JobID)?, Error> because
238238
// we want to be able to exit the closure without cancelling the transaction
239-
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<JobQueueResult<JobID>?, Error> in
240-
while true {
241-
try Task.checkCancellation()
242-
243-
let stream = try await connection.query(
244-
"""
245-
WITH next_job AS (
246-
SELECT
247-
job_id
248-
FROM _hb_pg_job_queue
249-
WHERE delayed_until <= NOW()
250-
ORDER BY createdAt, delayed_until ASC
251-
FOR UPDATE SKIP LOCKED
252-
LIMIT 1
253-
)
254-
DELETE FROM
255-
_hb_pg_job_queue
256-
WHERE job_id = (SELECT job_id FROM next_job)
257-
RETURNING job_id
258-
""",
259-
logger: self.logger
260-
)
261-
// return nil if nothing in queue
262-
guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
263-
return Result.success(nil)
264-
}
265-
// select job from job table
266-
let stream2 = try await connection.query(
267-
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
268-
logger: self.logger
239+
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<(ByteBuffer, JobID)?, Error> in
240+
try Task.checkCancellation()
241+
242+
let stream = try await connection.query(
243+
"""
244+
WITH next_job AS (
245+
SELECT
246+
job_id
247+
FROM _hb_pg_job_queue
248+
WHERE delayed_until <= NOW()
249+
ORDER BY createdAt, delayed_until ASC
250+
FOR UPDATE SKIP LOCKED
251+
LIMIT 1
269252
)
253+
DELETE FROM
254+
_hb_pg_job_queue
255+
WHERE job_id = (SELECT job_id FROM next_job)
256+
RETURNING job_id
257+
""",
258+
logger: self.logger
259+
)
260+
// return nil if nothing in queue
261+
guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
262+
return Result.success(nil)
263+
}
264+
// set job status to processing
265+
try await self.setStatus(jobID: jobID, status: .processing, connection: connection)
270266

271-
do {
272-
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
273-
logger.error(
274-
"Failed to find job with id",
275-
metadata: [
276-
"JobID": "\(jobID)"
277-
]
278-
)
279-
// if failed to find a job in the job table try getting another index
280-
continue
281-
}
282-
try await self.setStatus(jobID: jobID, status: .processing, connection: connection)
283-
do {
284-
let jobInstance = try self.jobRegistry.decode(buffer)
285-
return Result.success(.init(id: jobID, result: .success(jobInstance)))
286-
} catch let error as JobQueueError {
287-
return Result.success(.init(id: jobID, result: .failure(error)))
288-
}
289-
} catch {
290-
try await self.setStatus(jobID: jobID, status: .failed, connection: connection)
291-
return Result.failure(
292-
JobQueueError(
293-
code: .decodeJobFailed,
294-
jobName: nil,
295-
details: "\(String(reflecting: error))"
296-
)
297-
)
298-
}
267+
// select job from job table
268+
let stream2 = try await connection.query(
269+
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
270+
logger: self.logger
271+
)
272+
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
273+
logger.error(
274+
"Failed to find job with id",
275+
metadata: [
276+
"JobID": "\(jobID)"
277+
]
278+
)
279+
// if failed to find the job in the job table return nil
280+
return .success(nil)
299281
}
282+
return .success((buffer, jobID))
283+
284+
}
285+
guard let (buffer, jobID) = try result.get() else { return nil }
286+
do {
287+
let jobInstance = try self.jobRegistry.decode(buffer)
288+
return JobQueueResult(id: jobID, result: .success(jobInstance))
289+
} catch let error as JobQueueError {
290+
return JobQueueResult(id: jobID, result: .failure(error))
300291
}
301-
return try result.get()
302292
} catch let error as PSQLError {
303293
logger.error(
304294
"Failed to get job from queue",

0 commit comments

Comments
 (0)