Skip to content

Commit fa6ba76

Browse files
committed
Adding Job priority
* Job priority
1 parent 7885074 commit fa6ba76

File tree

3 files changed

+75
-11
lines changed

3 files changed

+75
-11
lines changed

Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,17 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
4141
job_id uuid PRIMARY KEY,
4242
created_at TIMESTAMPTZ NOT NULL,
4343
delayed_until TIMESTAMPTZ NOT NULL DEFAULT now(),
44-
queue_name TEXT NOT NULL DEFAULT 'default'
44+
queue_name TEXT NOT NULL DEFAULT 'default',
45+
priority SMALLINT NOT NULL DEFAULT 0
4546
);
4647
""",
4748
logger: logger
4849
)
4950

5051
try await connection.query(
5152
"""
52-
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_queue_name_idx
53-
ON swift_jobs.queues(delayed_until, queue_name)
53+
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_priority_queue_name_idx
54+
ON swift_jobs.queues(delayed_until, queue_name, priority ASC)
5455
""",
5556
logger: logger
5657
)

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,36 @@ public final class PostgresJobQueue: JobQueueDriver {
5555
public struct JobOptions: JobOptionsProtocol {
5656
/// Delay running job until
5757
public var delayUntil: Date
58+
/// Priority for this jobs highest priority 0 to 9, lowest priority
59+
public var priority: Int16
5860

5961
/// Default initializer for JobOptions
6062
public init() {
6163
self.delayUntil = .now
64+
self.priority = 0
6265
}
63-
66+
6467
/// Initializer for JobOptions
6568
/// - Parameter delayUntil: Whether job execution should be delayed until a later date
6669
public init(delayUntil: Date?) {
6770
self.delayUntil = delayUntil ?? .now
71+
self.priority = 0
72+
}
73+
74+
/// Initializer for JobOptions
75+
/// - Parameter delayUntil: Whether job execution should be delayed until a later date
76+
/// - Parameter priority: The priority for a job
77+
public init(delayUntil: Date = .now, priority: Int16 = 0) {
78+
self.delayUntil = delayUntil
79+
self.priority = priority
80+
}
81+
82+
/// Initializer for JobOptions
83+
/// - Parameter delayUntil: Whether job execution should be delayed until a later date
84+
/// - Parameter priority: The priority for a job
85+
public init(delayUntil: Date?, priority: Int16 = 0) {
86+
self.delayUntil = delayUntil ?? .now
87+
self.priority = priority
6888
}
6989
}
7090

@@ -194,7 +214,7 @@ public final class PostgresJobQueue: JobQueueDriver {
194214
let jobID = JobID()
195215
try await self.client.withTransaction(logger: self.logger) { connection in
196216
try await self.add(jobID: jobID, jobRequest: jobRequest, queueName: configuration.queueName, connection: connection)
197-
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection)
217+
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: options, connection: connection)
198218
}
199219
return jobID
200220
}
@@ -208,7 +228,7 @@ public final class PostgresJobQueue: JobQueueDriver {
208228
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
209229
try await self.client.withTransaction(logger: self.logger) { connection in
210230
try await self.updateJob(id: id, buffer: buffer, connection: connection)
211-
try await self.addToQueue(jobID: id, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection)
231+
try await self.addToQueue(jobID: id, queueName: configuration.queueName, options: .init(), connection: connection)
212232
}
213233
}
214234

@@ -273,7 +293,7 @@ public final class PostgresJobQueue: JobQueueDriver {
273293
FROM swift_jobs.queues
274294
WHERE delayed_until <= NOW()
275295
AND queue_name = \(configuration.queueName)
276-
ORDER BY created_at, delayed_until ASC
296+
ORDER BY created_at ASC, delayed_until ASC, priority ASC
277297
FOR UPDATE SKIP LOCKED
278298
LIMIT 1
279299
)
@@ -381,11 +401,11 @@ public final class PostgresJobQueue: JobQueueDriver {
381401
)
382402
}
383403

384-
func addToQueue(jobID: JobID, queueName: String, delayUntil: Date, connection: PostgresConnection) async throws {
404+
func addToQueue(jobID: JobID, queueName: String, options: JobOptions, connection: PostgresConnection) async throws {
385405
try await connection.query(
386406
"""
387-
INSERT INTO swift_jobs.queues (job_id, created_at, delayed_until, queue_name)
388-
VALUES (\(jobID), \(Date.now), \(delayUntil), \(queueName))
407+
INSERT INTO swift_jobs.queues (job_id, created_at, delayed_until, queue_name, priority)
408+
VALUES (\(jobID), \(Date.now), \(options.delayUntil), \(queueName), \(options.priority))
389409
-- We have found an existing job with the same id, SKIP this INSERT
390410
ON CONFLICT (job_id) DO NOTHING
391411
""",
@@ -449,7 +469,7 @@ public final class PostgresJobQueue: JobQueueDriver {
449469
let jobs = try await getJobs(withStatus: status)
450470
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
451471
for jobID in jobs {
452-
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, delayUntil: Date.now, connection: connection)
472+
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: .init(), connection: connection)
453473
}
454474

455475
case .doNothing:

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,49 @@ final class JobsTests: XCTestCase {
206206
}
207207
XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [5, 1])
208208
}
209+
210+
func testJobPriorities() async throws {
211+
struct TestParameters: JobParameters {
212+
static let jobName = "testPriorityJobs"
213+
let value: Int
214+
}
215+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
216+
let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([])
217+
218+
try await self.testJobQueue(numWorkers: 1) { jobQueue in
219+
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
220+
context.logger.info("Parameters=\(parameters.value)")
221+
jobExecutionSequence.withLockedValue {
222+
$0.append(parameters.value)
223+
}
224+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
225+
expectation.fulfill()
226+
}
227+
228+
await withThrowingTaskGroup(of: Void.self) { group in
229+
for i in 0..<2 {
230+
group.addTask {
231+
try await jobQueue.push(
232+
TestParameters(value: 20 + i),
233+
options: .init(
234+
priority: Int16.random(in: 0..<9)
235+
)
236+
)
237+
}
238+
}
239+
}
240+
241+
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
242+
XCTAssertEqual(processingJobs.count, 2)
243+
244+
await fulfillment(of: [expectation], timeout: 10)
245+
246+
let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
247+
XCTAssertEqual(pendingJobs.count, 0)
248+
}
249+
// TODO: need to figure out ordering here
250+
//XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [20, 21])
251+
}
209252

210253
func testMultipleWorkers() async throws {
211254
struct TestParameters: JobParameters {

0 commit comments

Comments
 (0)