Skip to content

Commit 1851298

Browse files
committed
Simplify job query and allow job rerun on start if specified
1 parent 33958a2 commit 1851298

File tree

2 files changed

+18
-14
lines changed

2 files changed

+18
-14
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ let package = Package(
99
.library(name: "JobsPostgres", targets: ["JobsPostgres"])
1010
],
1111
dependencies: [
12-
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", from: "1.0.0-beta.4"),
12+
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", from: "1.0.0-beta.7"),
1313
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"),
1414
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.21.0"),
1515
],

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ public final class PostgresJobQueue: JobQueueDriver {
130130
try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection)
131131
}
132132
} catch let error as PSQLError {
133-
print("\(String(reflecting: error))")
133+
logger.error("JobQueue initialization failed", metadata: [
134+
"error": "\(String(reflecting: error))"
135+
])
134136
throw error
135137
}
136138
}
@@ -194,17 +196,19 @@ public final class PostgresJobQueue: JobQueueDriver {
194196

195197
let stream = try await connection.query(
196198
"""
197-
DELETE FROM
198-
_hb_pg_job_queue
199-
USING (
200-
SELECT job_id FROM _hb_pg_job_queue
199+
WITH next_job AS (
200+
SELECT
201+
job_id
202+
FROM _hb_pg_job_queue
201203
WHERE (delayed_until IS NULL OR delayed_until <= NOW())
202204
ORDER BY createdAt, delayed_until ASC
203-
LIMIT 1
204205
FOR UPDATE SKIP LOCKED
205-
) queued
206-
WHERE queued.job_id = _hb_pg_job_queue.job_id
207-
RETURNING _hb_pg_job_queue.job_id
206+
LIMIT 1
207+
)
208+
DELETE FROM
209+
_hb_pg_job_queue
210+
WHERE job_id = (SELECT job_id FROM next_job)
211+
RETURNING job_id
208212
""",
209213
logger: self.logger
210214
)
@@ -214,7 +218,7 @@ public final class PostgresJobQueue: JobQueueDriver {
214218
}
215219
// select job from job table
216220
let stream2 = try await connection.query(
217-
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobId) FOR UPDATE SKIP LOCKED",
221+
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)",
218222
logger: self.logger
219223
)
220224

@@ -274,7 +278,9 @@ public final class PostgresJobQueue: JobQueueDriver {
274278
INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until)
275279
VALUES (\(jobId), \(Date.now), \(delayUntil))
276280
ON CONFLICT (job_id)
277-
DO UPDATE SET delayed_until = \(delayUntil)
281+
DO UPDATE
282+
SET delayed_until = COALESCE(_hb_pg_job_queue.delayed_until, EXCLUDED.delayed_until),
283+
createdAt = \(Date.now)
278284
""",
279285
logger: self.logger
280286
)
@@ -315,8 +321,6 @@ public final class PostgresJobQueue: JobQueueDriver {
315321
)
316322

317323
case .rerun:
318-
guard status != .pending else { return }
319-
320324
let jobs = try await getJobs(withStatus: status)
321325
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
322326
for jobId in jobs {

0 commit comments

Comments
 (0)