Skip to content

Commit 45c3376

Browse files
committed
Adding option to create unique jobs
* Updated the job priority struct to use static variables instead of static func * Add options to create unique jobs
1 parent 81119b9 commit 45c3376

File tree

3 files changed

+122
-34
lines changed

3 files changed

+122
-34
lines changed

Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
2929
job BYTEA NOT NULL,
3030
last_modified TIMESTAMPTZ NOT NULL DEFAULT now(),
3131
queue_name TEXT NOT NULL DEFAULT 'default',
32-
status SMALLINT NOT NULL
32+
status SMALLINT NOT NULL,
33+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
34+
unique_key TEXT NOT NULL UNIQUE
3335
);
3436
""",
3537
logger: logger

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -65,25 +65,15 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
6565
case highest = 4
6666
}
6767
/// Lowest priority
68-
public static func lowest() -> JobPriority {
69-
JobPriority(rawValue: .lowest)
70-
}
68+
public static let lowest: JobPriority = JobPriority(rawValue: .lowest)
7169
/// Lower priority
72-
public static func lower() -> JobPriority {
73-
JobPriority(rawValue: .lower)
74-
}
70+
public static let lower: JobPriority = JobPriority(rawValue: .lower)
7571
/// Normal is the default priority
76-
public static func normal() -> JobPriority {
77-
JobPriority(rawValue: .normal)
78-
}
72+
public static let normal: JobPriority = JobPriority(rawValue: .normal)
7973
/// Higher priority
80-
public static func higher() -> JobPriority {
81-
JobPriority(rawValue: .higher)
82-
}
74+
public static let higher: JobPriority = JobPriority(rawValue: .higher)
8375
/// Higgest priority
84-
public static func highest() -> JobPriority {
85-
JobPriority(rawValue: .highest)
86-
}
76+
public static let highest: JobPriority = JobPriority(rawValue: .highest)
8777
}
8878

8979
/// Options for job pushed to queue
@@ -92,26 +82,35 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
9282
public var delayUntil: Date
9383
/// Priority for this job
9484
public var priority: JobPriority
85+
/// Deduplication Key
86+
public var deduplicationKey: String
9587

9688
/// Default initializer for JobOptions
9789
public init() {
9890
self.delayUntil = .now
99-
self.priority = .normal()
91+
self.priority = .normal
92+
self.deduplicationKey = UUID().uuidString
10093
}
10194

10295
/// Initializer for JobOptions
10396
/// - Parameter delayUntil: Whether job execution should be delayed until a later date
10497
public init(delayUntil: Date?) {
10598
self.delayUntil = delayUntil ?? .now
106-
self.priority = .normal()
99+
self.priority = .normal
100+
self.deduplicationKey = UUID().uuidString
107101
}
108102

109103
/// Initializer for JobOptions
110104
/// - Parameter delayUntil: Whether job execution should be delayed until a later date
111105
/// - Parameter priority: The priority for a job
112-
public init(delayUntil: Date = .now, priority: JobPriority = .normal()) {
106+
public init(
107+
delayUntil: Date = .now,
108+
priority: JobPriority = .normal,
109+
deduplicationKey: String = UUID().uuidString
110+
) {
113111
self.delayUntil = delayUntil
114112
self.priority = priority
113+
self.deduplicationKey = deduplicationKey
115114
}
116115
}
117116

@@ -291,9 +290,28 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
291290
/// - Returns: Identifier of queued job
292291
@discardableResult public func push<Parameters: JobParameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
293292
let jobID = JobID()
294-
try await self.client.withTransaction(logger: self.logger) { connection in
295-
try await self.add(jobID: jobID, jobRequest: jobRequest, queueName: configuration.queueName, connection: connection)
296-
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, options: options, connection: connection)
293+
let insertedJob = try await self.client.withTransaction(logger: self.logger) { connection in
294+
let id = try await self.add(
295+
jobID: jobID,
296+
jobRequest: jobRequest,
297+
queueName: configuration.queueName,
298+
deduplicationKey: options.deduplicationKey,
299+
connection: connection
300+
)
301+
// We should never have and empty job id
302+
let insertedJobID = id ?? jobID
303+
try await self.addToQueue(
304+
jobID: insertedJobID,
305+
queueName: configuration.queueName,
306+
options: options,
307+
connection: connection
308+
)
309+
return insertedJobID
310+
}
311+
312+
if insertedJob != jobID {
313+
// TODO: introduce a duplicate jobID error
314+
throw JobQueueError(code: .unrecognisedJobId, jobName: Parameters.jobName)
297315
}
298316
return jobID
299317
}
@@ -452,14 +470,23 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
452470
}
453471
}
454472

455-
func add<Parameters>(jobID: JobID, jobRequest: JobRequest<Parameters>, queueName: String, connection: PostgresConnection) async throws {
456-
try await connection.query(
473+
func add<Parameters>(
474+
jobID: JobID,
475+
jobRequest: JobRequest<Parameters>,
476+
queueName: String,
477+
deduplicationKey: String,
478+
connection: PostgresConnection
479+
) async throws -> JobID? {
480+
let stream = try await connection.query(
457481
"""
458-
INSERT INTO swift_jobs.jobs (id, job, status, queue_name)
459-
VALUES (\(jobID), \(jobRequest), \(Status.pending), \(queueName))
482+
INSERT INTO swift_jobs.jobs (id, job, status, queue_name, unique_key)
483+
VALUES (\(jobID), \(jobRequest), \(Status.pending), \(queueName), \(deduplicationKey))
484+
ON CONFLICT DO NOTHING
485+
RETURNING id
460486
""",
461487
logger: self.logger
462488
)
489+
return try await stream.decode(JobID.self).first(where: { _ in true })
463490
}
464491
// TODO: maybe add a new column colum for attempt so far after PR https://github.com/hummingbird-project/swift-jobs/pull/63 is merged?
465492
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,14 +278,14 @@ final class JobsTests: XCTestCase {
278278
try await queue.push(
279279
TestParameters(value: 20),
280280
options: .init(
281-
priority: .lowest()
281+
priority: .lowest
282282
)
283283
)
284284

285285
try await queue.push(
286286
TestParameters(value: 2025),
287287
options: .init(
288-
priority: .highest()
288+
priority: .highest
289289
)
290290
)
291291

@@ -332,15 +332,15 @@ final class JobsTests: XCTestCase {
332332
try await queue.push(
333333
TestParameters(value: 20),
334334
options: .init(
335-
priority: .lower()
335+
priority: .lower
336336
)
337337
)
338338

339339
try await queue.push(
340340
TestParameters(value: 2025),
341341
options: .init(
342342
delayUntil: Date.now.addingTimeInterval(1),
343-
priority: .higher()
343+
priority: .higher
344344
)
345345
)
346346

@@ -717,14 +717,14 @@ final class JobsTests: XCTestCase {
717717
let resumableJob = try await queue.push(
718718
ResumableJob(),
719719
options: .init(
720-
priority: .lowest()
720+
priority: .lowest
721721
)
722722
)
723723

724724
try await queue.push(
725725
TestParameters(),
726726
options: .init(
727-
priority: .normal()
727+
priority: .normal
728728
)
729729
)
730730

@@ -793,14 +793,14 @@ final class JobsTests: XCTestCase {
793793
let cancellableJob = try await queue.push(
794794
TestParameters(value: 42),
795795
options: .init(
796-
priority: .lower()
796+
priority: .lower
797797
)
798798
)
799799

800800
try await queue.push(
801801
NoneCancelledJobParameters(value: 2025),
802802
options: .init(
803-
priority: .highest()
803+
priority: .highest
804804
)
805805
)
806806

@@ -827,4 +827,63 @@ final class JobsTests: XCTestCase {
827827
XCTAssertEqual(didRunCancelledJob.withLockedValue { $0 }, false)
828828
XCTAssertEqual(didRunNoneCancelledJob.withLockedValue { $0 }, true)
829829
}
830+
831+
func testUniqueJobs() async throws {
832+
struct TestParameters: JobParameters {
833+
static let jobName = "testUniqueJobs"
834+
let value: Int
835+
}
836+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)
837+
let jobValues: NIOLockedValueBox<[Int]> = .init([])
838+
839+
let jobQueue = try await self.createJobQueue(numWorkers: 1, configuration: .init(), function: #function)
840+
841+
try await testPriorityJobQueue(jobQueue: jobQueue) { queue in
842+
queue.registerJob(parameters: TestParameters.self) { parameters, context in
843+
context.logger.info("Parameters=\(parameters.value)")
844+
jobValues.withLockedValue {
845+
$0.append(parameters.value)
846+
}
847+
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
848+
expectation.fulfill()
849+
}
850+
851+
do {
852+
try await queue.push(
853+
TestParameters(value: 42),
854+
options: .init(
855+
priority: .lower,
856+
deduplicationKey: "unique-key"
857+
)
858+
)
859+
try await queue.push(
860+
TestParameters(value: 100),
861+
options: .init(
862+
priority: .lower,
863+
deduplicationKey: "unique-key"
864+
)
865+
)
866+
} catch let error as JobQueueError {
867+
XCTAssertEqual(error.code, .unrecognisedJobId)
868+
}
869+
870+
try await withThrowingTaskGroup(of: Void.self) { group in
871+
let serviceGroup = ServiceGroup(services: [queue], logger: queue.logger)
872+
873+
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
874+
XCTAssertEqual(processingJobs.count, 1)
875+
876+
group.addTask {
877+
try await serviceGroup.run()
878+
}
879+
880+
await fulfillment(of: [expectation], timeout: 10)
881+
let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
882+
XCTAssertEqual(pendingJobs.count, 0)
883+
884+
await serviceGroup.triggerGracefulShutdown()
885+
}
886+
}
887+
XCTAssertEqual(jobValues.withLockedValue { $0 }, [42])
888+
}
830889
}

0 commit comments

Comments
 (0)