Skip to content

Commit 366fd33

Browse files
authored
Simplify job query and allow job rerun on start if specified (#9)
* Simplify job query and allow job rerun on start if specified * chore(format): code format * do not modify existing jobs values if found in _hb_jobs * Simplify delayed_until check and set default to NOW() if no value is passed * Adding missing type to postgres column * Fixed delayed_until migration * Move the SQL queries in multiple function calls * Fixing job tests
1 parent 33958a2 commit 366fd33

File tree

3 files changed

+72
-17
lines changed

3 files changed

+72
-17
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ let package = Package(
99
.library(name: "JobsPostgres", targets: ["JobsPostgres"])
1010
],
1111
dependencies: [
12-
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", from: "1.0.0-beta.4"),
12+
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", from: "1.0.0-beta.7"),
1313
.package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"),
1414
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.21.0"),
1515
],
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
//
2+
// UpdateJobDelay.swift
3+
// swift-jobs-postgres
4+
//
5+
// Created by Stevenson Michel on 2/17/25.
6+
//
7+
8+
//===----------------------------------------------------------------------===//
9+
//
10+
// This source file is part of the Hummingbird server framework project
11+
//
12+
// Copyright (c) 2024 the Hummingbird authors
13+
// Licensed under Apache License v2.0
14+
//
15+
// See LICENSE.txt for license information
16+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
17+
//
18+
// SPDX-License-Identifier: Apache-2.0
19+
//
20+
//===----------------------------------------------------------------------===//
21+
22+
import Logging
23+
import PostgresMigrations
24+
import PostgresNIO
25+
26+
struct UpdateJobDelay: DatabaseMigration {
27+
func apply(connection: PostgresConnection, logger: Logger) async throws {
28+
try await connection.query(
29+
"""
30+
ALTER TABLE _hb_pg_job_queue ALTER COLUMN delayed_until TYPE TIMESTAMPTZ USING COALESCE(delayed_until, NOW()),
31+
ALTER COLUMN delayed_until SET DEFAULT NOW(),
32+
ALTER COLUMN delayed_until SET NOT NULL
33+
34+
""",
35+
logger: logger
36+
)
37+
}
38+
39+
func revert(connection: PostgresConnection, logger: Logger) async throws {
40+
try await connection.query(
41+
"ALTER TABLE _hb_pg_job_queue ALTER COLUMN delayed_until DROP NOT NULL",
42+
logger: logger
43+
)
44+
}
45+
46+
var name: String { "_Update_JobQueueDelay_Table_" }
47+
var group: DatabaseMigrationGroup { .jobQueue }
48+
}

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public final class PostgresJobQueue: JobQueueDriver {
112112
await migrations.add(CreateJobQueue())
113113
await migrations.add(CreateJobQueueMetadata())
114114
await migrations.add(CreateJobDelay())
115+
await migrations.add(UpdateJobDelay())
115116
}
116117

117118
/// Run on initialization of the job queue
@@ -130,7 +131,12 @@ public final class PostgresJobQueue: JobQueueDriver {
130131
try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection)
131132
}
132133
} catch let error as PSQLError {
133-
print("\(String(reflecting: error))")
134+
logger.error(
135+
"JobQueue initialization failed",
136+
metadata: [
137+
"error": "\(String(reflecting: error))"
138+
]
139+
)
134140
throw error
135141
}
136142
}
@@ -194,17 +200,19 @@ public final class PostgresJobQueue: JobQueueDriver {
194200

195201
let stream = try await connection.query(
196202
"""
197-
DELETE FROM
198-
_hb_pg_job_queue
199-
USING (
200-
SELECT job_id FROM _hb_pg_job_queue
201-
WHERE (delayed_until IS NULL OR delayed_until <= NOW())
203+
WITH next_job AS (
204+
SELECT
205+
job_id
206+
FROM _hb_pg_job_queue
207+
WHERE delayed_until <= NOW()
202208
ORDER BY createdAt, delayed_until ASC
203-
LIMIT 1
204209
FOR UPDATE SKIP LOCKED
205-
) queued
206-
WHERE queued.job_id = _hb_pg_job_queue.job_id
207-
RETURNING _hb_pg_job_queue.job_id
210+
LIMIT 1
211+
)
212+
DELETE FROM
213+
_hb_pg_job_queue
214+
WHERE job_id = (SELECT job_id FROM next_job)
215+
RETURNING job_id
208216
""",
209217
logger: self.logger
210218
)
@@ -214,7 +222,7 @@ public final class PostgresJobQueue: JobQueueDriver {
214222
}
215223
// select job from job table
216224
let stream2 = try await connection.query(
217-
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobId) FOR UPDATE SKIP LOCKED",
225+
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)",
218226
logger: self.logger
219227
)
220228

@@ -269,12 +277,13 @@ public final class PostgresJobQueue: JobQueueDriver {
269277
}
270278

271279
func addToQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date?) async throws {
280+
// TODO: assign Date.now in swift-jobs options?
272281
try await connection.query(
273282
"""
274283
INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until)
275-
VALUES (\(jobId), \(Date.now), \(delayUntil))
276-
ON CONFLICT (job_id)
277-
DO UPDATE SET delayed_until = \(delayUntil)
284+
VALUES (\(jobId), \(Date.now), \(delayUntil ?? Date.now))
285+
-- We have found an existing job with the same id, SKIP this INSERT
286+
ON CONFLICT (job_id) DO NOTHING
278287
""",
279288
logger: self.logger
280289
)
@@ -315,8 +324,6 @@ public final class PostgresJobQueue: JobQueueDriver {
315324
)
316325

317326
case .rerun:
318-
guard status != .pending else { return }
319-
320327
let jobs = try await getJobs(withStatus: status)
321328
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
322329
for jobId in jobs {

0 commit comments

Comments
 (0)