Skip to content

Commit 6d74f49

Browse files
authored
Move job serialisation to driver (#16)
* Get working with changes in swift-jobs * Reorder decode and setStatus also print error on failing to find a job * Comment, don't return error for unrecognised decode error * Separate out job decode from database transaction * Update for retry * Use swift-jobs main
1 parent 0b71707 commit 6d74f49

File tree

1 file changed

+89
-74
lines changed

1 file changed

+89
-74
lines changed

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 89 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public final class PostgresJobQueue: JobQueueDriver {
9999
public init(client: PostgresClient, migrations: DatabaseMigrations, configuration: Configuration = .init(), logger: Logger) async {
100100
self.client = client
101101
self.configuration = configuration
102+
self.jobRegistry = .init()
102103
self.logger = logger
103104
self.isStopped = .init(false)
104105
self.migrations = migrations
@@ -162,35 +163,43 @@ public final class PostgresJobQueue: JobQueueDriver {
162163
}
163164
}
164165

166+
/// Register job
167+
/// - Parameters:
168+
/// - job: Job Definition
169+
public func registerJob<Parameters: Codable & Sendable>(_ job: JobDefinition<Parameters>) {
170+
self.jobRegistry.registerJob(job)
171+
}
172+
165173
/// Push Job onto queue
166174
/// - Returns: Identifier of queued job
167-
@discardableResult public func push(_ buffer: ByteBuffer, options: JobOptions) async throws -> JobID {
175+
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
176+
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
177+
let jobID = JobID()
168178
try await self.client.withTransaction(logger: self.logger) { connection in
169-
let queuedJob = QueuedJob<JobID>(id: .init(), jobBuffer: buffer)
170-
try await self.add(queuedJob, connection: connection)
171-
try await self.addToQueue(jobId: queuedJob.id, connection: connection, delayUntil: options.delayUntil)
172-
return queuedJob.id
179+
try await self.add(jobID: jobID, jobBuffer: buffer, connection: connection)
180+
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil)
173181
}
182+
return jobID
174183
}
175184

176185
/// Retry a job
177186
/// - Returns: Bool
178-
@discardableResult public func retry(_ id: JobID, buffer: ByteBuffer, options: JobOptions) async throws -> Bool {
187+
public func retry<Parameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobOptions) async throws {
188+
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
179189
try await self.client.withTransaction(logger: self.logger) { connection in
180190
try await self.updateJob(id: id, buffer: buffer, connection: connection)
181-
try await self.addToQueue(jobId: id, connection: connection, delayUntil: options.delayUntil)
191+
try await self.addToQueue(jobID: id, connection: connection, delayUntil: options.delayUntil)
182192
}
183-
return true
184193
}
185194

186195
/// This is called to say job has finished processing and it can be deleted
187-
public func finished(jobId: JobID) async throws {
188-
try await self.delete(jobId: jobId)
196+
public func finished(jobID: JobID) async throws {
197+
try await self.delete(jobID: jobID)
189198
}
190199

191200
/// This is called to say job has failed to run and should be put aside
192-
public func failed(jobId: JobID, error: Error) async throws {
193-
try await self.setStatus(jobId: jobId, status: .failed)
201+
public func failed(jobID: JobID, error: Error) async throws {
202+
try await self.setStatus(jobID: jobID, status: .failed)
194203
}
195204

196205
/// stop serving jobs
@@ -223,60 +232,63 @@ public final class PostgresJobQueue: JobQueueDriver {
223232
)
224233
}
225234

226-
func popFirst() async throws -> QueuedJob<JobID>? {
235+
func popFirst() async throws -> JobQueueResult<JobID>? {
227236
do {
228-
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<QueuedJob<JobID>?, Error> in
229-
while true {
230-
try Task.checkCancellation()
231-
232-
let stream = try await connection.query(
233-
"""
234-
WITH next_job AS (
235-
SELECT
236-
job_id
237-
FROM _hb_pg_job_queue
238-
WHERE delayed_until <= NOW()
239-
ORDER BY createdAt, delayed_until ASC
240-
FOR UPDATE SKIP LOCKED
241-
LIMIT 1
242-
)
243-
DELETE FROM
244-
_hb_pg_job_queue
245-
WHERE job_id = (SELECT job_id FROM next_job)
246-
RETURNING job_id
247-
""",
248-
logger: self.logger
249-
)
250-
// return nil if nothing in queue
251-
guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
252-
return Result.success(nil)
253-
}
254-
// select job from job table
255-
let stream2 = try await connection.query(
256-
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)",
257-
logger: self.logger
237+
// The withTransaction closure returns a Result<(ByteBuffer, JobID)?, Error> because
238+
// we want to be able to exit the closure without cancelling the transaction
239+
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<(ByteBuffer, JobID)?, Error> in
240+
try Task.checkCancellation()
241+
242+
let stream = try await connection.query(
243+
"""
244+
WITH next_job AS (
245+
SELECT
246+
job_id
247+
FROM _hb_pg_job_queue
248+
WHERE delayed_until <= NOW()
249+
ORDER BY createdAt, delayed_until ASC
250+
FOR UPDATE SKIP LOCKED
251+
LIMIT 1
258252
)
253+
DELETE FROM
254+
_hb_pg_job_queue
255+
WHERE job_id = (SELECT job_id FROM next_job)
256+
RETURNING job_id
257+
""",
258+
logger: self.logger
259+
)
260+
// return nil if nothing in queue
261+
guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
262+
return Result.success(nil)
263+
}
264+
// set job status to processing
265+
try await self.setStatus(jobID: jobID, status: .processing, connection: connection)
259266

260-
do {
261-
try await self.setStatus(jobId: jobId, status: .processing, connection: connection)
262-
// if failed to find a job in the job table try getting another index
263-
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
264-
continue
265-
}
266-
return Result.success(QueuedJob(id: jobId, jobBuffer: buffer))
267-
} catch {
268-
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
269-
return Result.failure(
270-
JobQueueError(
271-
code: .decodeJobFailed,
272-
jobName: nil,
273-
details: "\(String(reflecting: error))"
274-
)
275-
)
276-
}
267+
// select job from job table
268+
let stream2 = try await connection.query(
269+
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
270+
logger: self.logger
271+
)
272+
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
273+
logger.error(
274+
"Failed to find job with id",
275+
metadata: [
276+
"JobID": "\(jobID)"
277+
]
278+
)
279+
// if failed to find the job in the job table return nil
280+
return .success(nil)
277281
}
282+
return .success((buffer, jobID))
283+
284+
}
285+
guard let (buffer, jobID) = try result.get() else { return nil }
286+
do {
287+
let jobInstance = try self.jobRegistry.decode(buffer)
288+
return JobQueueResult(id: jobID, result: .success(jobInstance))
289+
} catch let error as JobQueueError {
290+
return JobQueueResult(id: jobID, result: .failure(error))
278291
}
279-
return try result.get()
280292
} catch let error as PSQLError {
281293
logger.error(
282294
"Failed to get job from queue",
@@ -296,11 +308,11 @@ public final class PostgresJobQueue: JobQueueDriver {
296308
}
297309
}
298310

299-
func add(_ job: QueuedJob<JobID>, connection: PostgresConnection) async throws {
311+
func add(jobID: JobID, jobBuffer: ByteBuffer, connection: PostgresConnection) async throws {
300312
try await connection.query(
301313
"""
302314
INSERT INTO _hb_pg_jobs (id, job, status)
303-
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending))
315+
VALUES (\(jobID), \(jobBuffer), \(Status.pending))
304316
""",
305317
logger: self.logger
306318
)
@@ -319,35 +331,36 @@ public final class PostgresJobQueue: JobQueueDriver {
319331
)
320332
}
321333

322-
func delete(jobId: JobID) async throws {
334+
func delete(jobID: JobID) async throws {
323335
try await self.client.query(
324-
"DELETE FROM _hb_pg_jobs WHERE id = \(jobId)",
336+
"DELETE FROM _hb_pg_jobs WHERE id = \(jobID)",
325337
logger: self.logger
326338
)
327339
}
328340

329-
func addToQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
341+
func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
342+
// TODO: assign Date.now in swift-jobs options?
330343
try await connection.query(
331344
"""
332345
INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until)
333-
VALUES (\(jobId), \(Date.now), \(delayUntil))
346+
VALUES (\(jobID), \(Date.now), \(delayUntil))
334347
-- We have found an existing job with the same id, SKIP this INSERT
335348
ON CONFLICT (job_id) DO NOTHING
336349
""",
337350
logger: self.logger
338351
)
339352
}
340353

341-
func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws {
354+
func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws {
342355
try await connection.query(
343-
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
356+
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
344357
logger: self.logger
345358
)
346359
}
347360

348-
func setStatus(jobId: JobID, status: Status) async throws {
361+
func setStatus(jobID: JobID, status: Status) async throws {
349362
try await self.client.query(
350-
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
363+
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
351364
logger: self.logger
352365
)
353366
}
@@ -375,20 +388,22 @@ public final class PostgresJobQueue: JobQueueDriver {
375388
case .rerun:
376389
let jobs = try await getJobs(withStatus: status)
377390
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
378-
for jobId in jobs {
379-
try await self.addToQueue(jobId: jobId, connection: connection, delayUntil: Date.now)
391+
for jobID in jobs {
392+
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: Date.now)
380393
}
381394

382395
case .doNothing:
383396
break
384397
}
385398
}
399+
400+
let jobRegistry: JobRegistry
386401
}
387402

388403
/// extend PostgresJobQueue to conform to AsyncSequence
389404
extension PostgresJobQueue {
390405
public struct AsyncIterator: AsyncIteratorProtocol {
391-
public typealias Element = QueuedJob<JobID>
406+
public typealias Element = JobQueueResult<JobID>
392407

393408
let queue: PostgresJobQueue
394409

0 commit comments

Comments
 (0)