@@ -120,6 +120,54 @@ final class JobsTests: XCTestCase {
120
120
}
121
121
}
122
122
123
+ /// Helper for testing job priority
124
+ @discardableResult public func testPriorityJobQueue< T> (
125
+ jobQueue: JobQueue < PostgresJobQueue > ,
126
+ failedJobsInitialization: PostgresJobQueue . JobCleanup = . remove,
127
+ processingJobsInitialization: PostgresJobQueue . JobCleanup = . remove,
128
+ pendingJobsInitialization: PostgresJobQueue . JobCleanup = . remove,
129
+ revertMigrations: Bool = false ,
130
+ test: ( JobQueue < PostgresJobQueue > ) async throws -> T
131
+ ) async throws -> T {
132
+ do {
133
+ return try await withThrowingTaskGroup ( of: Void . self) { group in
134
+ let serviceGroup = ServiceGroup (
135
+ configuration: . init(
136
+ services: [ jobQueue. queue. client] ,
137
+ gracefulShutdownSignals: [ . sigterm, . sigint] ,
138
+ logger: jobQueue. queue. logger
139
+ )
140
+ )
141
+ group. addTask {
142
+ try await serviceGroup. run ( )
143
+ }
144
+ do {
145
+ let migrations = jobQueue. queue. migrations
146
+ let client = jobQueue. queue. client
147
+ let logger = jobQueue. queue. logger
148
+ if revertMigrations {
149
+ try await migrations. revert ( client: client, groups: [ . jobQueue] , logger: logger, dryRun: false )
150
+ }
151
+ try await migrations. apply ( client: client, groups: [ . jobQueue] , logger: logger, dryRun: false )
152
+ try await jobQueue. queue. cleanup ( failedJobs: failedJobsInitialization, processingJobs: processingJobsInitialization)
153
+ let value = try await test ( jobQueue)
154
+ await serviceGroup. triggerGracefulShutdown ( )
155
+ return value
156
+ } catch let error as PSQLError {
157
+ XCTFail ( " \( String ( reflecting: error) ) " )
158
+ await serviceGroup. triggerGracefulShutdown ( )
159
+ throw error
160
+ } catch {
161
+ await serviceGroup. triggerGracefulShutdown ( )
162
+ throw error
163
+ }
164
+ }
165
+ } catch let error as PSQLError {
166
+ XCTFail ( " \( String ( reflecting: error) ) " )
167
+ throw error
168
+ }
169
+ }
170
+
123
171
/// Helper function for test a server
124
172
///
125
173
/// Creates test client, runs test function abd ensures everything is
@@ -215,8 +263,10 @@ final class JobsTests: XCTestCase {
215
263
let expectation = XCTestExpectation ( description: " TestJob.execute was called " , expectedFulfillmentCount: 2 )
216
264
let jobExecutionSequence : NIOLockedValueBox < [ Int ] > = . init( [ ] )
217
265
218
- try await self . testJobQueue ( numWorkers: 1 ) { jobQueue in
219
- jobQueue. registerJob ( parameters: TestParameters . self) { parameters, context in
266
+ let jobQueue = try await self . createJobQueue ( numWorkers: 1 , configuration: . init( ) , function: #function)
267
+
268
+ try await testPriorityJobQueue ( jobQueue: jobQueue) { queue in
269
+ queue. registerJob ( parameters: TestParameters . self) { parameters, context in
220
270
context. logger. info ( " Parameters= \( parameters. value) " )
221
271
jobExecutionSequence. withLockedValue {
222
272
$0. append ( parameters. value)
@@ -225,29 +275,93 @@ final class JobsTests: XCTestCase {
225
275
expectation. fulfill ( )
226
276
}
227
277
228
- await withThrowingTaskGroup ( of: Void . self) { group in
229
- for i in 0 ..< 2 {
230
- group. addTask {
231
- try await jobQueue. push (
232
- TestParameters ( value: 20 + i) ,
233
- options: . init(
234
- priority: Int16 . random ( in: 0 ..< 9 )
235
- )
236
- )
237
- }
278
+ try await queue. push (
279
+ TestParameters ( value: 20 ) ,
280
+ options: . init(
281
+ priority: 9
282
+ )
283
+ )
284
+
285
+ try await queue. push (
286
+ TestParameters ( value: 2025 ) ,
287
+ options: . init(
288
+ priority: 2
289
+ )
290
+ )
291
+
292
+ try await withThrowingTaskGroup ( of: Void . self) { group in
293
+ let serviceGroup = ServiceGroup ( services: [ queue] , logger: queue. logger)
294
+
295
+ group. addTask {
296
+ try await serviceGroup. run ( )
238
297
}
298
+
299
+ let processingJobs = try await jobQueue. queue. getJobs ( withStatus: . pending)
300
+ XCTAssertEqual ( processingJobs. count, 2 )
301
+
302
+ await fulfillment ( of: [ expectation] , timeout: 10 )
303
+
304
+ let pendingJobs = try await jobQueue. queue. getJobs ( withStatus: . pending)
305
+ XCTAssertEqual ( pendingJobs. count, 0 )
306
+ await serviceGroup. triggerGracefulShutdown ( )
239
307
}
308
+ }
309
+ XCTAssertEqual ( jobExecutionSequence. withLockedValue { $0 } , [ 2025 , 20 ] )
310
+ }
311
+
312
+ func testJobPrioritiesWithDelay( ) async throws {
313
+ struct TestParameters : JobParameters {
314
+ static let jobName = " testPriorityJobsWithDelay "
315
+ let value : Int
316
+ }
317
+ let expectation = XCTestExpectation ( description: " TestJob.execute was called " , expectedFulfillmentCount: 2 )
318
+ let jobExecutionSequence : NIOLockedValueBox < [ Int ] > = . init( [ ] )
240
319
241
- let processingJobs = try await jobQueue. queue. getJobs ( withStatus: . pending)
242
- XCTAssertEqual ( processingJobs. count, 2 )
320
+ let jobQueue = try await self . createJobQueue ( numWorkers: 1 , configuration: . init( ) , function: #function)
243
321
244
- await fulfillment ( of: [ expectation] , timeout: 10 )
322
+ try await testPriorityJobQueue ( jobQueue: jobQueue) { queue in
323
+ queue. registerJob ( parameters: TestParameters . self) { parameters, context in
324
+ context. logger. info ( " Parameters= \( parameters. value) " )
325
+ jobExecutionSequence. withLockedValue {
326
+ $0. append ( parameters. value)
327
+ }
328
+ try await Task . sleep ( for: . milliseconds( Int . random ( in: 10 ..< 50 ) ) )
329
+ expectation. fulfill ( )
330
+ }
245
331
246
- let pendingJobs = try await jobQueue. queue. getJobs ( withStatus: . pending)
247
- XCTAssertEqual ( pendingJobs. count, 0 )
332
+ try await queue. push (
333
+ TestParameters ( value: 20 ) ,
334
+ options: . init(
335
+ priority: 9
336
+ )
337
+ )
338
+
339
+ try await queue. push (
340
+ TestParameters ( value: 2025 ) ,
341
+ options: . init(
342
+ delayUntil: Date . now. addingTimeInterval ( 1 ) ,
343
+ priority: 2
344
+ )
345
+ )
346
+
347
+ try await withThrowingTaskGroup ( of: Void . self) { group in
348
+ let serviceGroup = ServiceGroup ( services: [ queue] , logger: queue. logger)
349
+
350
+ group. addTask {
351
+ try await serviceGroup. run ( )
352
+ }
353
+
354
+ let processingJobs = try await jobQueue. queue. getJobs ( withStatus: . pending)
355
+ XCTAssertEqual ( processingJobs. count, 2 )
356
+
357
+ await fulfillment ( of: [ expectation] , timeout: 10 )
358
+
359
+ let pendingJobs = try await jobQueue. queue. getJobs ( withStatus: . pending)
360
+ XCTAssertEqual ( pendingJobs. count, 0 )
361
+ await serviceGroup. triggerGracefulShutdown ( )
362
+ }
248
363
}
249
- // TODO: need to figure out ordering here
250
- //XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [20, 21])
364
+ XCTAssertEqual ( jobExecutionSequence. withLockedValue { $0 } , [ 20 , 2025 ] )
251
365
}
252
366
253
367
func testMultipleWorkers( ) async throws {
0 commit comments