Skip to content

Commit 1476993

Browse files
authored
Fix up after requiring all jobs to conform to JobParameters (#22)
* Fix up after requiring all jobs to comform to JobParameters * Fix up documentation * Use swift-jobs main branch
1 parent d6a90fc commit 1476993

File tree

2 files changed

+105
-86
lines changed

2 files changed

+105
-86
lines changed

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,13 @@ public final class PostgresJobQueue: JobQueueDriver {
165165
/// Register job
166166
/// - Parameters:
167167
/// - job: Job Definition
168-
public func registerJob<Parameters: Codable & Sendable>(_ job: JobDefinition<Parameters>) {
168+
public func registerJob<Parameters: JobParameters>(_ job: JobDefinition<Parameters>) {
169169
self.jobRegistry.registerJob(job)
170170
}
171171

172172
/// Push Job onto queue
173173
/// - Returns: Identifier of queued job
174-
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
174+
@discardableResult public func push<Parameters: JobParameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
175175
let jobID = JobID()
176176
try await self.client.withTransaction(logger: self.logger) { connection in
177177
try await self.add(jobID: jobID, jobRequest: jobRequest, queueName: configuration.queueName, connection: connection)
@@ -180,9 +180,12 @@ public final class PostgresJobQueue: JobQueueDriver {
180180
return jobID
181181
}
182182

183-
/// Retry a job
184-
/// - Returns: Bool
185-
public func retry<Parameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobOptions) async throws {
183+
/// Retry an existing Job
184+
/// - Parameters
185+
/// - id: Job instance ID
186+
/// - jobRequest: Job Request
187+
/// - options: JobOptions
188+
public func retry<Parameters: JobParameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobOptions) async throws {
186189
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
187190
try await self.client.withTransaction(logger: self.logger) { connection in
188191
try await self.updateJob(id: id, buffer: buffer, connection: connection)

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 97 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,6 @@ extension XCTestExpectation {
4141
}
4242

4343
final class JobsTests: XCTestCase {
44-
func wait(for expectations: [XCTestExpectation], timeout: TimeInterval) async {
45-
#if (os(Linux) && swift(<5.9)) || swift(<5.8)
46-
super.wait(for: expectations, timeout: timeout)
47-
#else
48-
await fulfillment(of: expectations, timeout: timeout)
49-
#endif
50-
}
51-
5244
func createJobQueue(
5345
numWorkers: Int,
5446
configuration: PostgresJobQueue.Configuration = .init(),
@@ -157,57 +149,61 @@ final class JobsTests: XCTestCase {
157149
}
158150

159151
func testBasic() async throws {
152+
struct TestParameters: JobParameters {
153+
static let jobName = "testBasic"
154+
let value: Int
155+
}
160156
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10)
161-
let jobIdentifer = JobIdentifier<Int>(#function)
162157
try await self.testJobQueue(numWorkers: 1) { jobQueue in
163-
jobQueue.registerJob(id: jobIdentifer) { parameters, context in
164-
context.logger.info("Parameters=\(parameters)")
158+
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
159+
context.logger.info("Parameters=\(parameters.value)")
165160
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
166161
expectation.fulfill()
167162
}
168-
try await jobQueue.push(id: jobIdentifer, parameters: 1)
169-
try await jobQueue.push(id: jobIdentifer, parameters: 2)
170-
try await jobQueue.push(id: jobIdentifer, parameters: 3)
171-
try await jobQueue.push(id: jobIdentifer, parameters: 4)
172-
try await jobQueue.push(id: jobIdentifer, parameters: 5)
173-
try await jobQueue.push(id: jobIdentifer, parameters: 6)
174-
try await jobQueue.push(id: jobIdentifer, parameters: 7)
175-
try await jobQueue.push(id: jobIdentifer, parameters: 8)
176-
try await jobQueue.push(id: jobIdentifer, parameters: 9)
177-
try await jobQueue.push(id: jobIdentifer, parameters: 10)
178-
179-
await self.wait(for: [expectation], timeout: 5)
163+
try await jobQueue.push(TestParameters(value: 1))
164+
try await jobQueue.push(TestParameters(value: 2))
165+
try await jobQueue.push(TestParameters(value: 3))
166+
try await jobQueue.push(TestParameters(value: 4))
167+
try await jobQueue.push(TestParameters(value: 5))
168+
try await jobQueue.push(TestParameters(value: 6))
169+
try await jobQueue.push(TestParameters(value: 7))
170+
try await jobQueue.push(TestParameters(value: 8))
171+
try await jobQueue.push(TestParameters(value: 9))
172+
try await jobQueue.push(TestParameters(value: 10))
173+
174+
await fulfillment(of: [expectation], timeout: 5)
180175
}
181176
}
182177

183178
func testDelayedJobs() async throws {
184-
let jobIdentifer = JobIdentifier<Int>(#function)
185-
let jobIdentifer2 = JobIdentifier<Int>(#function)
179+
struct TestParameters: JobParameters {
180+
static let jobName = "testDelayedJobs"
181+
let value: Int
182+
}
186183
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
187184
let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([])
188185

189186
try await self.testJobQueue(numWorkers: 1) { jobQueue in
190-
jobQueue.registerJob(id: jobIdentifer) { parameters, context in
191-
context.logger.info("Parameters=\(parameters)")
187+
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
188+
context.logger.info("Parameters=\(parameters.value)")
192189
jobExecutionSequence.withLockedValue {
193-
$0.append(parameters)
190+
$0.append(parameters.value)
194191
}
195192
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
196193
expectation.fulfill()
197194
}
198195
try await jobQueue.push(
199-
id: jobIdentifer,
200-
parameters: 1,
196+
TestParameters(value: 1),
201197
options: .init(
202198
delayUntil: Date.now.addingTimeInterval(1)
203199
)
204200
)
205-
try await jobQueue.push(id: jobIdentifer2, parameters: 5)
201+
try await jobQueue.push(TestParameters(value: 5))
206202

207203
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
208204
XCTAssertEqual(processingJobs.count, 2)
209205

210-
await self.wait(for: [expectation], timeout: 10)
206+
await fulfillment(of: [expectation], timeout: 10)
211207

212208
let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending)
213209
XCTAssertEqual(pendingJobs.count, 0)
@@ -216,13 +212,16 @@ final class JobsTests: XCTestCase {
216212
}
217213

218214
func testMultipleWorkers() async throws {
219-
let jobIdentifer = JobIdentifier<Int>(#function)
215+
struct TestParameters: JobParameters {
216+
static let jobName = "testMultipleWorkers"
217+
let value: Int
218+
}
220219
let runningJobCounter = ManagedAtomic(0)
221220
let maxRunningJobCounter = ManagedAtomic(0)
222221
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10)
223222

224223
try await self.testJobQueue(numWorkers: 4) { jobQueue in
225-
jobQueue.registerJob(id: jobIdentifer) { parameters, context in
224+
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
226225
let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
227226
if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) {
228227
maxRunningJobCounter.store(runningJobs, ordering: .relaxed)
@@ -233,36 +232,38 @@ final class JobsTests: XCTestCase {
233232
runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed)
234233
}
235234

236-
try await jobQueue.push(id: jobIdentifer, parameters: 1)
237-
try await jobQueue.push(id: jobIdentifer, parameters: 2)
238-
try await jobQueue.push(id: jobIdentifer, parameters: 3)
239-
try await jobQueue.push(id: jobIdentifer, parameters: 4)
240-
try await jobQueue.push(id: jobIdentifer, parameters: 5)
241-
try await jobQueue.push(id: jobIdentifer, parameters: 6)
242-
try await jobQueue.push(id: jobIdentifer, parameters: 7)
243-
try await jobQueue.push(id: jobIdentifer, parameters: 8)
244-
try await jobQueue.push(id: jobIdentifer, parameters: 9)
245-
try await jobQueue.push(id: jobIdentifer, parameters: 10)
235+
try await jobQueue.push(TestParameters(value: 1))
236+
try await jobQueue.push(TestParameters(value: 2))
237+
try await jobQueue.push(TestParameters(value: 3))
238+
try await jobQueue.push(TestParameters(value: 4))
239+
try await jobQueue.push(TestParameters(value: 5))
240+
try await jobQueue.push(TestParameters(value: 6))
241+
try await jobQueue.push(TestParameters(value: 7))
242+
try await jobQueue.push(TestParameters(value: 8))
243+
try await jobQueue.push(TestParameters(value: 9))
244+
try await jobQueue.push(TestParameters(value: 10))
246245

247-
await self.wait(for: [expectation], timeout: 5)
246+
await fulfillment(of: [expectation], timeout: 5)
248247

249248
XCTAssertGreaterThan(maxRunningJobCounter.load(ordering: .relaxed), 1)
250249
XCTAssertLessThanOrEqual(maxRunningJobCounter.load(ordering: .relaxed), 4)
251250
}
252251
}
253252

254253
func testErrorRetryCount() async throws {
255-
let jobIdentifer = JobIdentifier<Int>(#function)
254+
struct TestParameters: JobParameters {
255+
static let jobName = "testErrorRetryCount"
256+
}
256257
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4)
257258
struct FailedError: Error {}
258259
try await self.testJobQueue(numWorkers: 1) { jobQueue in
259-
jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in
260+
jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in
260261
expectation.fulfill()
261262
throw FailedError()
262263
}
263-
try await jobQueue.push(id: jobIdentifer, parameters: 0)
264+
try await jobQueue.push(TestParameters())
264265

265-
await self.wait(for: [expectation], timeout: 5)
266+
await fulfillment(of: [expectation], timeout: 5)
266267
try await Task.sleep(for: .milliseconds(200))
267268

268269
let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed)
@@ -273,12 +274,14 @@ final class JobsTests: XCTestCase {
273274
}
274275

275276
func testErrorRetryAndThenSucceed() async throws {
276-
let jobIdentifer = JobIdentifier<Int>(#function)
277+
struct TestParameters: JobParameters {
278+
static let jobName = "testErrorRetryAndThenSucceed"
279+
}
277280
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
278281
let currentJobTryCount: NIOLockedValueBox<Int> = .init(0)
279282
struct FailedError: Error {}
280283
try await self.testJobQueue(numWorkers: 1) { jobQueue in
281-
jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in
284+
jobQueue.registerJob(parameters: TestParameters.self, maxRetryCount: 3) { _, _ in
282285
defer {
283286
currentJobTryCount.withLockedValue {
284287
$0 += 1
@@ -289,9 +292,9 @@ final class JobsTests: XCTestCase {
289292
throw FailedError()
290293
}
291294
}
292-
try await jobQueue.push(id: jobIdentifer, parameters: 0)
295+
try await jobQueue.push(TestParameters())
293296

294-
await self.wait(for: [expectation], timeout: 5)
297+
await fulfillment(of: [expectation], timeout: 5)
295298
try await Task.sleep(for: .milliseconds(200))
296299

297300
let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed)
@@ -303,36 +306,38 @@ final class JobsTests: XCTestCase {
303306
}
304307

305308
func testJobSerialization() async throws {
306-
struct TestJobParameters: Codable {
309+
struct TestJobParameters: JobParameters {
310+
static let jobName = "testJobSerialization"
307311
let id: Int
308312
let message: String
309313
}
310314
let expectation = XCTestExpectation(description: "TestJob.execute was called")
311-
let jobIdentifer = JobIdentifier<TestJobParameters>(#function)
312315
try await self.testJobQueue(numWorkers: 1) { jobQueue in
313-
jobQueue.registerJob(id: jobIdentifer) { parameters, _ in
316+
jobQueue.registerJob(parameters: TestJobParameters.self) { parameters, _ in
314317
XCTAssertEqual(parameters.id, 23)
315318
XCTAssertEqual(parameters.message, "Hello!")
316319
expectation.fulfill()
317320
}
318-
try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!"))
321+
try await jobQueue.push(TestJobParameters(id: 23, message: "Hello!"))
319322

320-
await self.wait(for: [expectation], timeout: 5)
323+
await fulfillment(of: [expectation], timeout: 5)
321324
}
322325
}
323326

324327
/// Test job is cancelled on shutdown
325328
func testShutdownJob() async throws {
326-
let jobIdentifer = JobIdentifier<Int>(#function)
329+
struct TestParameters: JobParameters {
330+
static let jobName = "testShutdownJob"
331+
}
327332
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1)
328333

329334
try await self.testJobQueue(numWorkers: 4) { jobQueue in
330-
jobQueue.registerJob(id: jobIdentifer) { _, _ in
335+
jobQueue.registerJob(parameters: TestParameters.self) { _, _ in
331336
expectation.fulfill()
332337
try await Task.sleep(for: .milliseconds(1000))
333338
}
334-
try await jobQueue.push(id: jobIdentifer, parameters: 0)
335-
await self.wait(for: [expectation], timeout: 5)
339+
try await jobQueue.push(TestParameters())
340+
await fulfillment(of: [expectation], timeout: 5)
336341

337342
let processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing)
338343
XCTAssertEqual(processingJobs.count, 1)
@@ -344,19 +349,25 @@ final class JobsTests: XCTestCase {
344349

345350
/// test job fails to decode but queue continues to process
346351
func testFailToDecode() async throws {
352+
struct TestIntParameter: JobParameters {
353+
static let jobName = "testFailToDecode"
354+
let value: Int
355+
}
356+
struct TestStringParameter: JobParameters {
357+
static let jobName = "testFailToDecode"
358+
let value: String
359+
}
347360
let string: NIOLockedValueBox<String> = .init("")
348-
let jobIdentifer1 = JobIdentifier<Int>(#function)
349-
let jobIdentifer2 = JobIdentifier<String>(#function)
350361
let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1)
351362

352363
try await self.testJobQueue(numWorkers: 4) { jobQueue in
353-
jobQueue.registerJob(id: jobIdentifer2) { parameters, _ in
354-
string.withLockedValue { $0 = parameters }
364+
jobQueue.registerJob(parameters: TestStringParameter.self) { parameters, _ in
365+
string.withLockedValue { $0 = parameters.value }
355366
expectation.fulfill()
356367
}
357-
try await jobQueue.push(id: jobIdentifer1, parameters: 2)
358-
try await jobQueue.push(id: jobIdentifer2, parameters: "test")
359-
await self.wait(for: [expectation], timeout: 5)
368+
try await jobQueue.push(TestIntParameter(value: 2))
369+
try await jobQueue.push(TestStringParameter(value: "test"))
370+
await fulfillment(of: [expectation], timeout: 5)
360371
}
361372
string.withLockedValue {
362373
XCTAssertEqual($0, "test")
@@ -366,13 +377,15 @@ final class JobsTests: XCTestCase {
366377
/// creates job that errors on first attempt, and is left on processing queue and
367378
/// is then rerun on startup of new server
368379
func testRerunAtStartup() async throws {
380+
struct TestParameters: JobParameters {
381+
static let jobName = "testRerunAtStartup"
382+
}
369383
struct RetryError: Error {}
370-
let jobIdentifer = JobIdentifier<Int>(#function)
371384
let firstTime = ManagedAtomic(true)
372385
let finished = ManagedAtomic(false)
373386
let failedExpectation = XCTestExpectation(description: "TestJob failed", expectedFulfillmentCount: 1)
374387
let succeededExpectation = XCTestExpectation(description: "TestJob2 succeeded", expectedFulfillmentCount: 1)
375-
let job = JobDefinition(id: jobIdentifer) { _, _ in
388+
let job = JobDefinition(parameters: TestParameters.self) { _, _ in
376389
if firstTime.compareExchange(expected: true, desired: false, ordering: .relaxed).original {
377390
failedExpectation.fulfill()
378391
throw RetryError()
@@ -388,9 +401,9 @@ final class JobsTests: XCTestCase {
388401
jobQueue: jobQueue,
389402
revertMigrations: true
390403
) { jobQueue in
391-
try await jobQueue.push(id: jobIdentifer, parameters: 0)
404+
try await jobQueue.push(TestParameters())
392405

393-
await self.wait(for: [failedExpectation], timeout: 10)
406+
await fulfillment(of: [failedExpectation], timeout: 10)
394407

395408
XCTAssertFalse(firstTime.load(ordering: .relaxed))
396409
XCTAssertFalse(finished.load(ordering: .relaxed))
@@ -399,21 +412,24 @@ final class JobsTests: XCTestCase {
399412
let jobQueue2 = try await createJobQueue(numWorkers: 1)
400413
jobQueue2.registerJob(job)
401414
try await self.testJobQueue(jobQueue: jobQueue2, failedJobsInitialization: .rerun) { _ in
402-
await self.wait(for: [succeededExpectation], timeout: 10)
415+
await fulfillment(of: [succeededExpectation], timeout: 10)
403416
XCTAssertTrue(finished.load(ordering: .relaxed))
404417
}
405418
}
406419

407420
func testMultipleJobQueueHandlers() async throws {
408-
let jobIdentifer = JobIdentifier<Int>(#function)
421+
struct TestParameters: JobParameters {
422+
static let jobName = "testMultipleJobQueueHandlers"
423+
let value: Int
424+
}
409425
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200)
410426
let logger = {
411427
var logger = Logger(label: "testMultipleJobQueueHandlers")
412428
logger.logLevel = .debug
413429
return logger
414430
}()
415-
let job = JobDefinition(id: jobIdentifer) { parameters, context in
416-
context.logger.info("Parameters=\(parameters)")
431+
let job = JobDefinition(parameters: TestParameters.self) { parameters, context in
432+
context.logger.info("Parameters=\(parameters.value)")
417433
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
418434
expectation.fulfill()
419435
}
@@ -463,11 +479,11 @@ final class JobsTests: XCTestCase {
463479
try await jobQueue.queue.cleanup(failedJobs: .remove, processingJobs: .remove)
464480
try await jobQueue2.queue.cleanup(failedJobs: .remove, processingJobs: .remove)
465481
do {
466-
for i in 0..<100 {
467-
try await jobQueue2.push(id: jobIdentifer, parameters: i)
468-
try await jobQueue.push(id: jobIdentifer, parameters: i)
482+
for i in 0..<200 {
483+
try await jobQueue.push(TestParameters(value: i))
484+
try await jobQueue2.push(TestParameters(value: i))
469485
}
470-
await self.wait(for: [expectation], timeout: 5)
486+
await fulfillment(of: [expectation], timeout: 5)
471487
await serviceGroup.triggerGracefulShutdown()
472488
} catch {
473489
XCTFail("\(String(reflecting: error))")

0 commit comments

Comments
 (0)