Skip to content

Commit 0b71707

Browse files
Keep current job id (#15)
* Keep current job id * Do not modify existing job id on retry * use retry func from swift-jobs * Addressing PR comment * Use swift-jobs branch main --------- Co-authored-by: Adam Fowler <adamfowler71@gmail.com>
1 parent 3c4b16c commit 0b71707

File tree

1 file changed

+23
-1
lines changed

1 file changed

+23
-1
lines changed

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,16 @@ public final class PostgresJobQueue: JobQueueDriver {
173173
}
174174
}
175175

176+
/// Retry a job
177+
/// - Returns: Bool
178+
@discardableResult public func retry(_ id: JobID, buffer: ByteBuffer, options: JobOptions) async throws -> Bool {
179+
try await self.client.withTransaction(logger: self.logger) { connection in
180+
try await self.updateJob(id: id, buffer: buffer, connection: connection)
181+
try await self.addToQueue(jobId: id, connection: connection, delayUntil: options.delayUntil)
182+
}
183+
return true
184+
}
185+
176186
/// This is called to say job has finished processing and it can be deleted
177187
public func finished(jobId: JobID) async throws {
178188
try await self.delete(jobId: jobId)
@@ -295,6 +305,19 @@ public final class PostgresJobQueue: JobQueueDriver {
295305
logger: self.logger
296306
)
297307
}
308+
// TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
309+
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
310+
try await connection.query(
311+
"""
312+
UPDATE _hb_pg_jobs
313+
SET job = \(buffer),
314+
lastModified = \(Date.now),
315+
status = \(Status.failed)
316+
WHERE id = \(id)
317+
""",
318+
logger: self.logger
319+
)
320+
}
298321

299322
func delete(jobId: JobID) async throws {
300323
try await self.client.query(
@@ -304,7 +327,6 @@ public final class PostgresJobQueue: JobQueueDriver {
304327
}
305328

306329
func addToQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
307-
// TODO: assign Date.now in swift-jobs options?
308330
try await connection.query(
309331
"""
310332
INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until)

0 commit comments

Comments
 (0)