Skip to content

Commit cc39800

Browse files
authored
Merge pull request #24 from thoven87/job-priority
Adding Job priority
2 parents 7885074 + 60e4fca commit cc39800

File tree

3 files changed

+219
-10
lines changed

3 files changed

+219
-10
lines changed

Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,17 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
4141
job_id uuid PRIMARY KEY,
4242
created_at TIMESTAMPTZ NOT NULL,
4343
delayed_until TIMESTAMPTZ NOT NULL DEFAULT now(),
44-
queue_name TEXT NOT NULL DEFAULT 'default'
44+
queue_name TEXT NOT NULL DEFAULT 'default',
45+
priority SMALLINT NOT NULL DEFAULT 0
4546
);
4647
""",
4748
logger: logger
4849
)
4950

5051
try await connection.query(
5152
"""
52-
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_queue_name_idx
53-
ON swift_jobs.queues(delayed_until, queue_name)
53+
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_priority_queue_name_idx
54+
ON swift_jobs.queues(priority DESC, delayed_until ASC, queue_name ASC)
5455
""",
5556
logger: logger
5657
)

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,66 @@ public final class PostgresJobQueue: JobQueueDriver {
5151
case remove
5252
}
5353

54+
/// Job priority from lowest to highest
55+
public struct JobPriority: Equatable, Sendable {
56+
let rawValue: Priority
57+
58+
// Job priority
59+
enum Priority: Int16, Sendable, PostgresCodable {
60+
case lowest = 0
61+
case lower = 1
62+
case normal = 2
63+
case higher = 3
64+
case highest = 4
65+
}
66+
/// Lowest priority
67+
public static func lowest() -> JobPriority {
68+
JobPriority(rawValue: .lowest)
69+
}
70+
/// Lower priority
71+
public static func lower() -> JobPriority {
72+
JobPriority(rawValue: .lower)
73+
}
74+
/// Normal is the default priority
75+
public static func normal() -> JobPriority {
76+
JobPriority(rawValue: .normal)
77+
}
78+
/// Higher priority
79+
public static func higher() -> JobPriority {
80+
JobPriority(rawValue: .higher)
81+
}
82+
/// Higgest priority
83+
public static func highest() -> JobPriority {
84+
JobPriority(rawValue: .highest)
85+
}
86+
}
87+
5488
/// Options for job pushed to queue
5589
public struct JobOptions: JobOptionsProtocol {
5690
/// Delay running job until
5791
public var delayUntil: Date
92+
/// Priority for this job
93+
public var priority: JobPriority
5894

5995
/// Default initializer for JobOptions
6096
public init() {
6197
self.delayUntil = .now
98+
self.priority = .normal()
6299
}
63100

64101
/// Initializer for JobOptions
65102
/// - Parameter delayUntil: Whether job execution should be delayed until a later date
66103
public init(delayUntil: Date?) {
67104
self.delayUntil = delayUntil ?? .now
105+
self.priority = .normal()
106+
}
107+
108+
/// Initializer for JobOptions
109+
/// - Parameter delayUntil: Whether job execution should be delayed until a later date
110+
/// - Parameter priority: The priority for a job
111+
public init(delayUntil: Date = .now, priority: JobPriority = .normal()) {
112+
self.delayUntil = delayUntil
113+
self.priority = priority
68114
}
69115
}
70116

@@ -194,7 +240,7 @@ public final class PostgresJobQueue: JobQueueDriver {
194240
let jobID = JobID()
195241
try await self.client.withTransaction(logger: self.logger) { connection in
196242
try await self.add(jobID: jobID, jobRequest: jobRequest, queueName: configuration.queueName, connection: connection)
197-
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection)
243+
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: options, connection: connection)
198244
}
199245
return jobID
200246
}
@@ -208,7 +254,12 @@ public final class PostgresJobQueue: JobQueueDriver {
208254
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
209255
try await self.client.withTransaction(logger: self.logger) { connection in
210256
try await self.updateJob(id: id, buffer: buffer, connection: connection)
211-
try await self.addToQueue(jobID: id, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection)
257+
try await self.addToQueue(
258+
jobID: id,
259+
queueName: configuration.queueName,
260+
options: .init(delayUntil: options.delayUntil),
261+
connection: connection
262+
)
212263
}
213264
}
214265

@@ -273,7 +324,7 @@ public final class PostgresJobQueue: JobQueueDriver {
273324
FROM swift_jobs.queues
274325
WHERE delayed_until <= NOW()
275326
AND queue_name = \(configuration.queueName)
276-
ORDER BY created_at, delayed_until ASC
327+
ORDER BY priority DESC, delayed_until ASC, created_at ASC
277328
FOR UPDATE SKIP LOCKED
278329
LIMIT 1
279330
)
@@ -381,11 +432,11 @@ public final class PostgresJobQueue: JobQueueDriver {
381432
)
382433
}
383434

384-
func addToQueue(jobID: JobID, queueName: String, delayUntil: Date, connection: PostgresConnection) async throws {
435+
func addToQueue(jobID: JobID, queueName: String, options: JobOptions, connection: PostgresConnection) async throws {
385436
try await connection.query(
386437
"""
387-
INSERT INTO swift_jobs.queues (job_id, created_at, delayed_until, queue_name)
388-
VALUES (\(jobID), \(Date.now), \(delayUntil), \(queueName))
438+
INSERT INTO swift_jobs.queues (job_id, created_at, delayed_until, queue_name, priority)
439+
VALUES (\(jobID), \(Date.now), \(options.delayUntil), \(queueName), \(options.priority.rawValue))
389440
-- We have found an existing job with the same id, SKIP this INSERT
390441
ON CONFLICT (job_id) DO NOTHING
391442
""",
@@ -449,7 +500,7 @@ public final class PostgresJobQueue: JobQueueDriver {
449500
let jobs = try await getJobs(withStatus: status)
450501
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
451502
for jobID in jobs {
452-
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, delayUntil: Date.now, connection: connection)
503+
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection)
453504
}
454505

455506
case .doNothing:

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,54 @@ final class JobsTests: XCTestCase {
120120
}
121121
}
122122

123+
/// Helper for testing job priority
124+
@discardableResult public func testPriorityJobQueue<T>(
125+
jobQueue: JobQueue<PostgresJobQueue>,
126+
failedJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
127+
processingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
128+
pendingJobsInitialization: PostgresJobQueue.JobCleanup = .remove,
129+
revertMigrations: Bool = false,
130+
test: (JobQueue<PostgresJobQueue>) async throws -> T
131+
) async throws -> T {
132+
do {
133+
return try await withThrowingTaskGroup(of: Void.self) { group in
134+
let serviceGroup = ServiceGroup(
135+
configuration: .init(
136+
services: [jobQueue.queue.client],
137+
gracefulShutdownSignals: [.sigterm, .sigint],
138+
logger: jobQueue.queue.logger
139+
)
140+
)
141+
group.addTask {
142+
try await serviceGroup.run()
143+
}
144+
do {
145+
let migrations = jobQueue.queue.migrations
146+
let client = jobQueue.queue.client
147+
let logger = jobQueue.queue.logger
148+
if revertMigrations {
149+
try await migrations.revert(client: client, groups: [.jobQueue], logger: logger, dryRun: false)
150+
}
151+
try await migrations.apply(client: client, groups: [.jobQueue], logger: logger, dryRun: false)
152+
try await jobQueue.queue.cleanup(failedJobs: failedJobsInitialization, processingJobs: processingJobsInitialization)
153+
let value = try await test(jobQueue)
154+
await serviceGroup.triggerGracefulShutdown()
155+
return value
156+
} catch let error as PSQLError {
157+
XCTFail("\(String(reflecting: error))")
158+
await serviceGroup.triggerGracefulShutdown()
159+
throw error
160+
} catch {
161+
await serviceGroup.triggerGracefulShutdown()
162+
throw error
163+
}
164+
}
165+
} catch let error as PSQLError {
166+
XCTFail("\(String(reflecting: error))")
167+
throw error
168+
}
169+
}
170+
123171
/// Helper function for test a server
124172
///
125173
/// Creates test client, runs test function abd ensures everything is
@@ -207,6 +255,115 @@ final class JobsTests: XCTestCase {
207255
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [5, 1])
208256
}
209257

258+
func testJobPriorities() async throws {
259+
struct TestParameters: JobParameters {
260+
static let jobName = "testPriorityJobs"
261+
let value: Int
262+
}
263+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
264+
let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([])
265+
266+
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
267+
268+
try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
269+
queue.registerJob(parameters: TestParameters.self) { parameters, context in
270+
context.logger.info("Parameters=\(parameters.value)")
271+
jobExecutionSequence.withLockedValue {
272+
$0.append(parameters.value)
273+
}
274+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
275+
expectation.fulfill()
276+
}
277+
278+
try await queue.push(
279+
TestParameters(value: 20),
280+
options: .init(
281+
priority: .lowest()
282+
)
283+
)
284+
285+
try await queue.push(
286+
TestParameters(value: 2025),
287+
options: .init(
288+
priority: .highest()
289+
)
290+
)
291+
292+
try await withThrowingTaskGroup(of: Void.self) { group in
293+
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
294+
295+
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
296+
XCTAssertEqual(processingJobs.count, 2)
297+
298+
group.addTask {
299+
try await serviceGroup.run()
300+
}
301+
302+
await fulfillment(of: [expectation], timeout: 10)
303+
304+
let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
305+
XCTAssertEqual(pendingJobs.count, 0)
306+
await serviceGroup.triggerGracefulShutdown()
307+
}
308+
}
309+
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [2025, 20])
310+
}
311+
312+
func testJobPrioritiesWithDelay() async throws {
313+
struct TestParameters: JobParameters {
314+
static let jobName = "testPriorityJobsWithDelay"
315+
let value: Int
316+
}
317+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
318+
let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([])
319+
320+
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
321+
322+
try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
323+
queue.registerJob(parameters: TestParameters.self) { parameters, context in
324+
context.logger.info("Parameters=\(parameters.value)")
325+
jobExecutionSequence.withLockedValue {
326+
$0.append(parameters.value)
327+
}
328+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
329+
expectation.fulfill()
330+
}
331+
332+
try await queue.push(
333+
TestParameters(value: 20),
334+
options: .init(
335+
priority: .lower()
336+
)
337+
)
338+
339+
try await queue.push(
340+
TestParameters(value: 2025),
341+
options: .init(
342+
delayUntil: Date.now.addingTimeInterval(1),
343+
priority: .higher()
344+
)
345+
)
346+
347+
try await withThrowingTaskGroup(of: Void.self) { group in
348+
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
349+
350+
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
351+
XCTAssertEqual(processingJobs.count, 2)
352+
353+
group.addTask {
354+
try await serviceGroup.run()
355+
}
356+
357+
await fulfillment(of: [expectation], timeout: 10)
358+
359+
let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
360+
XCTAssertEqual(pendingJobs.count, 0)
361+
await serviceGroup.triggerGracefulShutdown()
362+
}
363+
}
364+
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [20, 2025])
365+
}
366+
210367
func testMultipleWorkers() async throws {
211368
struct TestParameters: JobParameters {
212369
static let jobName = "testMultipleWorkers"

0 commit comments

Comments
 (0)