@@ -112,6 +112,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
112
112
}
113
113
114
114
/// Job Status
115
+ @usableFromInline
115
116
enum Status : Int16 , PostgresCodable {
116
117
case pending = 0
117
118
case processing = 1
@@ -124,11 +125,11 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
124
125
/// Queue configuration
125
126
public struct Configuration : Sendable {
126
127
/// Queue poll time to wait if queue empties
127
- let pollTime : Duration
128
+ public var pollTime : Duration
128
129
/// Which Queue to push jobs into
129
- let queueName : String
130
+ public var queueName : String
130
131
/// Retention policy for jobs
131
- let retentionPolicy : RetentionPolicy
132
+ public var retentionPolicy : RetentionPolicy
132
133
133
134
/// Initialize configuration
134
135
/// - Parameters
@@ -152,6 +153,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
152
153
/// Logger used by queue
153
154
public let logger : Logger
154
155
let migrations : DatabaseMigrations
156
+ @usableFromInline
155
157
let isStopped : NIOLockedValueBox < Bool >
156
158
157
159
/// Initialize a PostgresJobQueue
@@ -180,11 +182,12 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
180
182
/// - Parameters:
181
183
/// - jobID: an existing job
182
184
/// - Throws:
185
+ @inlinable
183
186
public func cancel( jobID: JobID ) async throws {
184
187
try await self . client. withTransaction ( logger: logger) { connection in
185
188
try await deleteFromQueue ( jobID: jobID, connection: connection)
186
189
if configuration. retentionPolicy. cancelled == . doNotRetain {
187
- try await delete ( jobID: jobID)
190
+ try await delete ( jobID: jobID, connection : connection )
188
191
} else {
189
192
try await setStatus ( jobID: jobID, status: . cancelled, connection: connection)
190
193
}
@@ -199,6 +202,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
199
202
/// - Parameters:
200
203
/// - jobID: an existing job
201
204
/// - Throws:
205
+ @inlinable
202
206
public func pause( jobID: UUID ) async throws {
203
207
try await self . client. withTransaction ( logger: logger) { connection in
204
208
try await deleteFromQueue ( jobID: jobID, connection: connection)
@@ -214,6 +218,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
214
218
/// - Parameters:
215
219
/// - jobID: an existing job
216
220
/// - Throws:
221
+ @inlinable
217
222
public func resume( jobID: JobID ) async throws {
218
223
try await self . client. withTransaction ( logger: logger) { connection in
219
224
try await setStatus ( jobID: jobID, status: . pending, connection: connection)
@@ -235,7 +240,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
235
240
236
241
/// Push Job onto queue
237
242
/// - Returns: Identifier of queued job
238
- @discardableResult public func push< Parameters> ( _ jobRequest: JobRequest < Parameters > , options: JobOptions ) async throws -> JobID {
243
+ @discardableResult
244
+ @inlinable
245
+ public func push< Parameters> ( _ jobRequest: JobRequest < Parameters > , options: JobOptions ) async throws -> JobID {
239
246
let jobID = JobID ( )
240
247
try await self . client. withTransaction ( logger: self . logger) { connection in
241
248
try await self . add ( jobID: jobID, jobRequest: jobRequest, queueName: configuration. queueName, connection: connection)
@@ -249,10 +256,10 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
249
256
/// - jobID: Job instance ID
250
257
/// - jobRequest: Job Request
251
258
/// - options: Job retry options
259
+ @inlinable
252
260
public func retry< Parameters> ( _ jobID: JobID , jobRequest: JobRequest < Parameters > , options: JobRetryOptions ) async throws {
253
- let buffer = try self . jobRegistry. encode ( jobRequest: jobRequest)
254
261
try await self . client. withTransaction ( logger: self . logger) { connection in
255
- try await self . updateJob ( jobID: jobID, buffer : buffer , connection: connection)
262
+ try await self . updateJob ( jobID: jobID, jobRequest : jobRequest , connection: connection)
256
263
try await self . addToQueue (
257
264
jobID: jobID,
258
265
queueName: configuration. queueName,
@@ -263,6 +270,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
263
270
}
264
271
265
272
/// This is called to say job has finished processing and it can be deleted
273
+ @inlinable
266
274
public func finished( jobID: JobID ) async throws {
267
275
if configuration. retentionPolicy. completed == . doNotRetain {
268
276
try await self . delete ( jobID: jobID)
@@ -272,6 +280,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
272
280
}
273
281
274
282
/// This is called to say job has failed to run and should be put aside
283
+ @inlinable
275
284
public func failed( jobID: JobID , error: Error ) async throws {
276
285
if configuration. retentionPolicy. failed == . doNotRetain {
277
286
try await self . delete ( jobID: jobID)
@@ -288,6 +297,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
288
297
/// shutdown queue once all active jobs have been processed
289
298
public func shutdownGracefully( ) async { }
290
299
300
+ @inlinable
291
301
public func getMetadata( _ key: String ) async throws -> ByteBuffer ? {
292
302
let stream = try await self . client. query (
293
303
" SELECT value FROM swift_jobs.queues_metadata WHERE key = \( key) AND queue_name = \( configuration. queueName) " ,
@@ -299,6 +309,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
299
309
return nil
300
310
}
301
311
312
+ @inlinable
302
313
public func setMetadata( key: String , value: ByteBuffer ) async throws {
303
314
try await self . client. query (
304
315
"""
@@ -311,6 +322,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
311
322
)
312
323
}
313
324
325
+ @usableFromInline
314
326
func popFirst( ) async throws -> JobQueueResult < JobID > ? {
315
327
enum PopFirstResult {
316
328
case nothing
@@ -406,6 +418,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
406
418
}
407
419
}
408
420
421
+ @usableFromInline
409
422
func add< Parameters> ( jobID: JobID , jobRequest: JobRequest < Parameters > , queueName: String , connection: PostgresConnection ) async throws {
410
423
try await connection. query (
411
424
"""
@@ -416,7 +429,9 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
416
429
)
417
430
}
418
431
419
- func updateJob( jobID: JobID , buffer: ByteBuffer , connection: PostgresConnection ) async throws {
432
+ @usableFromInline
433
+ func updateJob< Parameters> ( jobID: JobID , jobRequest: JobRequest < Parameters > , connection: PostgresConnection ) async throws {
434
+ let buffer = try self . jobRegistry. encode ( jobRequest: jobRequest)
420
435
try await connection. query (
421
436
"""
422
437
UPDATE swift_jobs.jobs
@@ -429,6 +444,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
429
444
)
430
445
}
431
446
447
+ @usableFromInline
432
448
func delete( jobID: JobID ) async throws {
433
449
try await self . client. query (
434
450
"""
@@ -439,6 +455,18 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
439
455
)
440
456
}
441
457
458
+ @usableFromInline
459
+ func delete( jobID: JobID , connection: PostgresConnection ) async throws {
460
+ try await connection. query (
461
+ """
462
+ DELETE FROM swift_jobs.jobs
463
+ WHERE id = \( jobID) AND queue_name = \( configuration. queueName)
464
+ """ ,
465
+ logger: self . logger
466
+ )
467
+ }
468
+
469
+ @usableFromInline
442
470
func deleteFromQueue( jobID: JobID , connection: PostgresConnection ) async throws {
443
471
try await connection. query (
444
472
"""
@@ -449,6 +477,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
449
477
)
450
478
}
451
479
480
+ @usableFromInline
452
481
func addToQueue( jobID: JobID , queueName: String , options: JobOptions , connection: PostgresConnection ) async throws {
453
482
try await connection. query (
454
483
"""
@@ -461,6 +490,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
461
490
)
462
491
}
463
492
493
+ @usableFromInline
464
494
func setStatus( jobID: JobID , status: Status , connection: PostgresConnection ) async throws {
465
495
try await connection. query (
466
496
"""
@@ -473,6 +503,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
473
503
)
474
504
}
475
505
506
+ @usableFromInline
476
507
func setStatus( jobID: JobID , status: Status ) async throws {
477
508
try await self . client. query (
478
509
"""
@@ -527,8 +558,10 @@ extension PostgresJobQueue {
527
558
public struct AsyncIterator : AsyncIteratorProtocol {
528
559
public typealias Element = JobQueueResult < JobID >
529
560
561
+ @usableFromInline
530
562
let queue : PostgresJobQueue
531
563
564
+ @inlinable
532
565
public func next( ) async throws -> Element ? {
533
566
while true {
534
567
if self . queue. isStopped. withLockedValue ( { $0 } ) {
0 commit comments