Skip to content

Commit 895122b

Browse files
committed
Keep current job id
* Do not modify existing job id on retry
1 parent 3c4b16c commit 895122b

File tree

3 files changed

+37
-6
lines changed

3 files changed

+37
-6
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ let package = Package(
1010
],
1111
dependencies: [
1212
// TODO: use a released version of swift-jobs
13-
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"),
13+
.package(url: "https://github.com/thoven87/swift-jobs.git", branch: "update-job-on-retry"),
1414
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"),
1515
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"),
1616
],

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,14 @@ public final class PostgresJobQueue: JobQueueDriver {
173173
}
174174
}
175175

176+
@discardableResult public func update(_ id: JobID, buffer: ByteBuffer, options: JobOptions) async throws -> Bool {
177+
try await self.client.withTransaction(logger: self.logger) { connection in
178+
try await self.update(id: id, buffer: buffer, connection: connection)
179+
try await self.updateQueue(jobId: id, connection: connection, delayUntil: options.delayUntil)
180+
}
181+
return true
182+
}
183+
176184
/// This is called to say job has finished processing and it can be deleted
177185
public func finished(jobId: JobID) async throws {
178186
try await self.delete(jobId: jobId)
@@ -295,6 +303,19 @@ public final class PostgresJobQueue: JobQueueDriver {
295303
logger: self.logger
296304
)
297305
}
306+
/// TODO maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
307+
func update(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
308+
try await connection.query(
309+
"""
310+
UPDATE _hb_pg_jobs
311+
SET job = \(buffer),
312+
lastModified = \(Date.now),
313+
status = \(Status.failed)
314+
WHERE id = \(id)
315+
""",
316+
logger: self.logger
317+
)
318+
}
298319

299320
func delete(jobId: JobID) async throws {
300321
try await self.client.query(
@@ -304,7 +325,6 @@ public final class PostgresJobQueue: JobQueueDriver {
304325
}
305326

306327
func addToQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
307-
// TODO: assign Date.now in swift-jobs options?
308328
try await connection.query(
309329
"""
310330
INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until)
@@ -316,6 +336,17 @@ public final class PostgresJobQueue: JobQueueDriver {
316336
)
317337
}
318338

339+
func updateQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
340+
try await connection.query(
341+
"""
342+
UPDATE _hb_pg_job_queue
343+
SET delayed_until = \(delayUntil)
344+
WHERE job_id = \(jobId)
345+
""",
346+
logger: self.logger
347+
)
348+
}
349+
319350
func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws {
320351
try await connection.query(
321352
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ final class JobsTests: XCTestCase {
253253

254254
func testErrorRetryCount() async throws {
255255
let jobIdentifer = JobIdentifier<Int>(#function)
256-
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4)
256+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)
257257
struct FailedError: Error {}
258258
try await self.testJobQueue(numWorkers: 1) { jobQueue in
259259
jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in
@@ -274,7 +274,7 @@ final class JobsTests: XCTestCase {
274274

275275
func testErrorRetryAndThenSucceed() async throws {
276276
let jobIdentifer = JobIdentifier<Int>(#function)
277-
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
277+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)
278278
let currentJobTryCount: NIOLockedValueBox<Int> = .init(0)
279279
struct FailedError: Error {}
280280
try await self.testJobQueue(numWorkers: 1) { jobQueue in
@@ -295,11 +295,11 @@ final class JobsTests: XCTestCase {
295295
try await Task.sleep(for: .milliseconds(200))
296296

297297
let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed)
298-
XCTAssertEqual(failedJobs.count, 0)
298+
XCTAssertEqual(failedJobs.count, 1)
299299
let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
300300
XCTAssertEqual(pendingJobs.count, 0)
301301
}
302-
XCTAssertEqual(currentJobTryCount.withLockedValue { $0 }, 2)
302+
XCTAssertEqual(currentJobTryCount.withLockedValue { $0 }, 1)
303303
}
304304

305305
func testJobSerialization() async throws {

0 commit comments

Comments
 (0)