Skip to content

Commit f4c480e

Browse files
committed
Get working with changes in swift-jobs
1 parent 0b71707 commit f4c480e

File tree

2 files changed

+51
-30
lines changed

2 files changed

+51
-30
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ let package = Package(
1010
],
1111
dependencies: [
1212
// TODO: use a released version of swift-jobs
13-
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"),
13+
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "job-driver-parameters"),
1414
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"),
1515
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"),
1616
],

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public final class PostgresJobQueue: JobQueueDriver {
9999
public init(client: PostgresClient, migrations: DatabaseMigrations, configuration: Configuration = .init(), logger: Logger) async {
100100
self.client = client
101101
self.configuration = configuration
102+
self.jobRegistry = .init()
102103
self.logger = logger
103104
self.isStopped = .init(false)
104105
self.migrations = migrations
@@ -162,15 +163,23 @@ public final class PostgresJobQueue: JobQueueDriver {
162163
}
163164
}
164165

166+
/// Register job
167+
/// - Parameters:
168+
/// - job: Job Definition
169+
public func registerJob<Parameters: Codable & Sendable>(_ job: JobDefinition<Parameters>) {
170+
self.jobRegistry.registerJob(job)
171+
}
172+
165173
/// Push Job onto queue
166174
/// - Returns: Identifier of queued job
167-
@discardableResult public func push(_ buffer: ByteBuffer, options: JobOptions) async throws -> JobID {
175+
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
176+
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
177+
let jobID = JobID()
168178
try await self.client.withTransaction(logger: self.logger) { connection in
169-
let queuedJob = QueuedJob<JobID>(id: .init(), jobBuffer: buffer)
170-
try await self.add(queuedJob, connection: connection)
171-
try await self.addToQueue(jobId: queuedJob.id, connection: connection, delayUntil: options.delayUntil)
172-
return queuedJob.id
179+
try await self.add(jobID: jobID, jobBuffer: buffer, connection: connection)
180+
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil)
173181
}
182+
return jobID
174183
}
175184

176185
/// Retry a job
@@ -184,13 +193,13 @@ public final class PostgresJobQueue: JobQueueDriver {
184193
}
185194

186195
/// This is called to say job has finished processing and it can be deleted
187-
public func finished(jobId: JobID) async throws {
188-
try await self.delete(jobId: jobId)
196+
public func finished(jobID: JobID) async throws {
197+
try await self.delete(jobID: jobID)
189198
}
190199

191200
/// This is called to say job has failed to run and should be put aside
192-
public func failed(jobId: JobID, error: Error) async throws {
193-
try await self.setStatus(jobId: jobId, status: .failed)
201+
public func failed(jobID: JobID, error: Error) async throws {
202+
try await self.setStatus(jobID: jobID, status: .failed)
194203
}
195204

196205
/// stop serving jobs
@@ -223,9 +232,9 @@ public final class PostgresJobQueue: JobQueueDriver {
223232
)
224233
}
225234

226-
func popFirst() async throws -> QueuedJob<JobID>? {
235+
func popFirst() async throws -> JobQueueResult<JobID>? {
227236
do {
228-
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<QueuedJob<JobID>?, Error> in
237+
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<JobQueueResult<JobID>?, Error> in
229238
while true {
230239
try Task.checkCancellation()
231240

@@ -248,24 +257,33 @@ public final class PostgresJobQueue: JobQueueDriver {
248257
logger: self.logger
249258
)
250259
// return nil if nothing in queue
251-
guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
260+
guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
252261
return Result.success(nil)
253262
}
254263
// select job from job table
255264
let stream2 = try await connection.query(
256-
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)",
265+
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
257266
logger: self.logger
258267
)
259268

260269
do {
261-
try await self.setStatus(jobId: jobId, status: .processing, connection: connection)
270+
try await self.setStatus(jobID: jobID, status: .processing, connection: connection)
262271
// if failed to find a job in the job table try getting another index
263272
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
264273
continue
265274
}
266-
return Result.success(QueuedJob(id: jobId, jobBuffer: buffer))
275+
do {
276+
let jobInstance = try self.jobRegistry.decode(buffer)
277+
return Result.success(.init(id: jobID, result: .success(jobInstance)))
278+
} catch let error as JobQueueError {
279+
return Result.success(.init(id: jobID, result: .failure(error)))
280+
} catch {
281+
return Result.success(
282+
.init(id: jobID, result: .failure(JobQueueError(code: .unrecognised, jobName: nil, details: "\(error)")))
283+
)
284+
}
267285
} catch {
268-
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
286+
try await self.setStatus(jobID: jobID, status: .failed, connection: connection)
269287
return Result.failure(
270288
JobQueueError(
271289
code: .decodeJobFailed,
@@ -296,11 +314,11 @@ public final class PostgresJobQueue: JobQueueDriver {
296314
}
297315
}
298316

299-
func add(_ job: QueuedJob<JobID>, connection: PostgresConnection) async throws {
317+
func add(jobID: JobID, jobBuffer: ByteBuffer, connection: PostgresConnection) async throws {
300318
try await connection.query(
301319
"""
302320
INSERT INTO _hb_pg_jobs (id, job, status)
303-
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending))
321+
VALUES (\(jobID), \(jobBuffer), \(Status.pending))
304322
""",
305323
logger: self.logger
306324
)
@@ -319,35 +337,36 @@ public final class PostgresJobQueue: JobQueueDriver {
319337
)
320338
}
321339

322-
func delete(jobId: JobID) async throws {
340+
func delete(jobID: JobID) async throws {
323341
try await self.client.query(
324-
"DELETE FROM _hb_pg_jobs WHERE id = \(jobId)",
342+
"DELETE FROM _hb_pg_jobs WHERE id = \(jobID)",
325343
logger: self.logger
326344
)
327345
}
328346

329-
func addToQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
347+
func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
348+
// TODO: assign Date.now in swift-jobs options?
330349
try await connection.query(
331350
"""
332351
INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until)
333-
VALUES (\(jobId), \(Date.now), \(delayUntil))
352+
VALUES (\(jobID), \(Date.now), \(delayUntil))
334353
-- We have found an existing job with the same id, SKIP this INSERT
335354
ON CONFLICT (job_id) DO NOTHING
336355
""",
337356
logger: self.logger
338357
)
339358
}
340359

341-
func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws {
360+
func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws {
342361
try await connection.query(
343-
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
362+
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
344363
logger: self.logger
345364
)
346365
}
347366

348-
func setStatus(jobId: JobID, status: Status) async throws {
367+
func setStatus(jobID: JobID, status: Status) async throws {
349368
try await self.client.query(
350-
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
369+
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
351370
logger: self.logger
352371
)
353372
}
@@ -375,20 +394,22 @@ public final class PostgresJobQueue: JobQueueDriver {
375394
case .rerun:
376395
let jobs = try await getJobs(withStatus: status)
377396
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
378-
for jobId in jobs {
379-
try await self.addToQueue(jobId: jobId, connection: connection, delayUntil: Date.now)
397+
for jobID in jobs {
398+
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: Date.now)
380399
}
381400

382401
case .doNothing:
383402
break
384403
}
385404
}
405+
406+
let jobRegistry: JobRegistry
386407
}
387408

388409
/// extend PostgresJobQueue to conform to AsyncSequence
389410
extension PostgresJobQueue {
390411
public struct AsyncIterator: AsyncIteratorProtocol {
391-
public typealias Element = QueuedJob<JobID>
412+
public typealias Element = JobQueueResult<JobID>
392413

393414
let queue: PostgresJobQueue
394415

0 commit comments

Comments
 (0)