Skip to content

Commit 1cc2292

Browse files
authored
Add the current job attempt to the context (#94)
* Add the current job attempt to the context * cleaned up * Updated job attempt tests and use 1 as default value * Clean up shouldRetry and fixed a bug that may cause a job to try more than the allowed limit * Revert "Clean up shouldRetry and fixed a bug that may cause a job to try more than the allowed limit" This reverts commit c51ea7f. * Renamed job attempts to attempt
1 parent e638c67 commit 1cc2292

File tree

10 files changed

+45
-33
lines changed

10 files changed

+45
-33
lines changed

Sources/Jobs/JobContext.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ public struct JobExecutionContext: Sendable {
3131
public let queuedAt: Date
3232
/// Next time job is scheduled to run
3333
public let nextScheduledAt: Date?
34+
/// Attempt number for this job.
35+
/// Starts at 1 and increments for every retry until max
36+
public let attempt: Int
3437
}
3538

3639
/// context for job being adding/removed from queue

Sources/Jobs/JobInstance.swift

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ public protocol JobInstanceProtocol: Sendable {
2828
var retryStrategy: any JobRetryStrategy { get }
2929
/// Time job was queued
3030
var queuedAt: Date { get }
31-
/// Number of attempts so far
32-
var attempts: Int? { get }
31+
/// Current attempt
32+
var attempt: Int { get }
3333
/// Job parameters
3434
var parameters: Parameters { get }
3535
/// Trace context
@@ -50,7 +50,7 @@ extension JobInstanceProtocol {
5050

5151
/// Should we retry this job
5252
public func shouldRetry(error: Error) -> Bool {
53-
self.retryStrategy.shouldRetry(attempt: self.attempts ?? 0, error: error)
53+
self.retryStrategy.shouldRetry(attempt: self.attempt, error: error)
5454
}
5555

5656
/// Extract trace context from job instance data
@@ -77,8 +77,8 @@ struct JobInstance<Parameters: JobParameters>: JobInstanceProtocol {
7777
var retryStrategy: any JobRetryStrategy { job.retryStrategy }
7878
/// Time job was queued
7979
var queuedAt: Date { self.data.queuedAt }
80-
/// Number of attempts so far
81-
var attempts: Int? { self.data.attempts ?? 0 }
80+
/// Current attempt
81+
var attempt: Int { self.data.attempt }
8282
/// Trace context
8383
var traceContext: [String: String]? { self.data.traceContext }
8484
/// Job parameters
@@ -104,8 +104,8 @@ public struct JobInstanceData<Parameters: JobParameters>: Codable, Sendable {
104104
let parameters: Parameters
105105
/// Time job was queued
106106
let queuedAt: Date
107-
/// Number of attempts so far
108-
let attempts: Int?
107+
/// Current attempt
108+
let attempt: Int
109109
/// trace context
110110
let traceContext: [String: String]?
111111
/// Next time job is scheduled to run
@@ -114,12 +114,12 @@ public struct JobInstanceData<Parameters: JobParameters>: Codable, Sendable {
114114
init(
115115
parameters: Parameters,
116116
queuedAt: Date,
117-
attempts: Int?,
117+
attempt: Int,
118118
nextScheduledAt: Date? = nil
119119
) {
120120
self.parameters = parameters
121121
self.queuedAt = queuedAt
122-
self.attempts = attempts
122+
self.attempt = attempt
123123
self.nextScheduledAt = nextScheduledAt
124124

125125
var traceContext: [String: String]? = nil
@@ -137,7 +137,7 @@ public struct JobInstanceData<Parameters: JobParameters>: Codable, Sendable {
137137
private enum CodingKeys: String, CodingKey {
138138
case parameters = "p"
139139
case queuedAt = "q"
140-
case attempts = "a"
140+
case attempt = "a"
141141
case traceContext = "t"
142142
case nextScheduledAt = "n"
143143
}

Sources/Jobs/JobQueue.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public struct JobQueue<Queue: JobQueueDriver>: JobQueueProtocol {
127127
_ parameters: Parameters,
128128
options: Queue.JobOptions = .init()
129129
) async throws -> Queue.JobID {
130-
let request = JobRequest(parameters: parameters, queuedAt: .now, attempts: 0)
130+
let request = JobRequest(parameters: parameters, queuedAt: .now, attempt: 1)
131131
let instanceID = try await self.queue.push(request, options: options)
132132
await self.handler.middleware.onPushJob(
133133
parameters: parameters,
@@ -156,7 +156,7 @@ public struct JobQueue<Queue: JobQueueDriver>: JobQueueProtocol {
156156
let request = JobRequest(
157157
parameters: parameters,
158158
queuedAt: currentSchedule,
159-
attempts: 0,
159+
attempt: 1,
160160
nextScheduledAt: nextScheduledAt
161161
)
162162
let instanceID = try await self.queue.push(request, options: options)

Sources/Jobs/JobQueueDriver.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ extension JobQueueDriver {
6464
}
6565

6666
extension JobQueueDriver {
67-
func retry(_ jobID: JobID, job: some JobInstanceProtocol, attempts: Int, options: JobRetryOptions) async throws {
68-
let jobRequest = JobRequest(parameters: job.parameters, queuedAt: job.queuedAt, attempts: attempts)
67+
func retry(_ jobID: JobID, job: some JobInstanceProtocol, attempt: Int, options: JobRetryOptions) async throws {
68+
let jobRequest = JobRequest(parameters: job.parameters, queuedAt: job.queuedAt, attempt: attempt)
6969
return try await self.retry(jobID, jobRequest: jobRequest, options: options)
7070
}
7171
}

Sources/Jobs/JobQueueHandler.swift

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ final class JobQueueHandler<Queue: JobQueueDriver>: Sendable {
105105
jobID: jobID.description,
106106
logger: logger,
107107
queuedAt: job.queuedAt,
108-
nextScheduledAt: job.nextScheduledAt
108+
nextScheduledAt: job.nextScheduledAt,
109+
attempt: job.attempt
109110
)
110111
try await handleJob(job: job, context: context)
111112
} catch let error as CancellationError {
@@ -125,15 +126,15 @@ final class JobQueueHandler<Queue: JobQueueDriver>: Sendable {
125126
return
126127
}
127128

128-
let attempts = (job.attempts ?? 0) + 1
129-
let delay = job.retryStrategy.calculateBackoff(attempt: attempts)
129+
let delay = job.retryStrategy.calculateBackoff(attempt: job.attempt)
130130
let delayUntil = Date.now._advanced(by: delay)
131+
let attempt = job.attempt + 1
131132

132133
/// retry the current job
133134
try await self.queue.retry(
134135
jobID,
135136
job: job,
136-
attempts: attempts,
137+
attempt: attempt,
137138
options: .init(delayUntil: delayUntil)
138139
)
139140

@@ -142,7 +143,7 @@ final class JobQueueHandler<Queue: JobQueueDriver>: Sendable {
142143
metadata: [
143144
"JobID": .stringConvertible(jobID),
144145
"JobName": .string(job.name),
145-
"attempts": .stringConvertible(attempts),
146+
"attempt": .stringConvertible(attempt),
146147
"delayedUntil": .stringConvertible(delayUntil),
147148
]
148149
)

Sources/Jobs/JobRequest.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,22 @@ public struct JobRequest<Parameters: JobParameters>: Encodable {
2626
init(
2727
parameters: Parameters,
2828
queuedAt: Date,
29-
attempts: Int,
29+
attempt: Int,
3030
nextScheduledAt: Date? = nil
3131
) {
3232
self.data = .init(
3333
parameters: parameters,
3434
queuedAt: queuedAt,
35-
attempts: attempts,
35+
attempt: attempt,
3636
nextScheduledAt: nextScheduledAt
3737
)
3838
}
3939

40-
init(jobInstance: JobInstance<Parameters>, attempts: Int) {
40+
init(jobInstance: JobInstance<Parameters>, attempt: Int) {
4141
self.data = .init(
4242
parameters: jobInstance.parameters,
4343
queuedAt: jobInstance.queuedAt,
44-
attempts: attempts
44+
attempt: attempt
4545
)
4646
}
4747

Sources/Jobs/Middleware/TracingMiddleware.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public struct TracingJobMiddleware: JobMiddleware {
4747
}
4848
span.updateAttributes { attributes in
4949
attributes["job.id"] = context.jobID
50-
attributes["job.attempt"] = (job.attempts ?? 0) + 1
50+
attributes["job.attempt"] = job.attempt
5151
attributes["job.queue"] = self.queueName
5252
}
5353
do {

Tests/JobsTests/JobMiddlewareTests.swift

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ final class JobMiddlewareTests: XCTestCase {
6060
let parameters = TestParameters(value: "test")
6161
let retryStrategy: JobRetryStrategy = .exponentialJitter(maxAttempts: 1)
6262
let queuedAt = Date.now
63-
let attempts: Int? = 0
63+
let attempt: Int = 0
6464
let nextScheduledAt: Date? = nil
6565
let traceContext: [String: String]? = nil
6666
let timeout: Duration? = nil
@@ -87,7 +87,8 @@ final class JobMiddlewareTests: XCTestCase {
8787
jobID: "0",
8888
logger: .init(label: "Test"),
8989
queuedAt: job.queuedAt,
90-
nextScheduledAt: job.nextScheduledAt
90+
nextScheduledAt: job.nextScheduledAt,
91+
attempt: job.attempt
9192
)
9293
) { _, _ in }
9394
XCTAssertEqual(observer1.handled, true)
@@ -106,7 +107,7 @@ final class JobMiddlewareTests: XCTestCase {
106107
let parameters = TestParameters(value: "test")
107108
let retryStrategy: JobRetryStrategy = .exponentialJitter(maxAttempts: 1)
108109
let queuedAt = Date.now
109-
let attempts: Int? = 0
110+
let attempt: Int = 0
110111
let traceContext: [String: String]? = nil
111112
var nextScheduledAt: Date? = nil
112113
let timeout: Duration? = nil
@@ -130,7 +131,8 @@ final class JobMiddlewareTests: XCTestCase {
130131
jobID: "0",
131132
logger: .init(label: "Test"),
132133
queuedAt: job.queuedAt,
133-
nextScheduledAt: job.nextScheduledAt
134+
nextScheduledAt: job.nextScheduledAt,
135+
attempt: job.attempt
134136
)
135137
) { _, _ in }
136138
XCTAssertEqual(middleware1.handled, first == true)
@@ -151,7 +153,7 @@ final class JobMiddlewareTests: XCTestCase {
151153
let parameters = TestParameters(value: "test")
152154
let retryStrategy: JobRetryStrategy = .exponentialJitter(maxAttempts: 1)
153155
let queuedAt = Date.now
154-
let attempts: Int? = 0
156+
let attempt: Int = 0
155157
let traceContext: [String: String]? = nil
156158
let nextScheduledAt: Date? = Date.now
157159
let timeout: Duration? = nil
@@ -180,7 +182,8 @@ final class JobMiddlewareTests: XCTestCase {
180182
jobID: "0",
181183
logger: .init(label: "Test"),
182184
queuedAt: job.queuedAt,
183-
nextScheduledAt: job.nextScheduledAt
185+
nextScheduledAt: job.nextScheduledAt,
186+
attempt: job.attempt
184187
)
185188
) { _, _ in }
186189
XCTAssertEqual(middleware1.handled, first == true)

Tests/JobsTests/JobsTests.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,9 @@ final class JobsTests: XCTestCase {
135135
struct TestParameters: JobParameters {
136136
static let jobName = "testErrorRetryCount"
137137
}
138-
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4)
138+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3)
139139
let failedJobCount = ManagedAtomic(0)
140+
let attemptCounter: NIOLockedValueBox<[Int]> = .init([])
140141
struct FailedError: Error {}
141142
var logger = Logger(label: "JobsTests")
142143
logger.logLevel = .trace
@@ -147,8 +148,11 @@ final class JobsTests: XCTestCase {
147148
jobQueue.registerJob(
148149
parameters: TestParameters.self,
149150
retryStrategy: .exponentialJitter(maxAttempts: 3, maxBackoff: .seconds(0.5), minJitter: 0.0, maxJitter: 0.01)
150-
) { _, _ in
151+
) { _, context in
151152
expectation.fulfill()
153+
attemptCounter.withLockedValue {
154+
$0.append(context.attempt)
155+
}
152156
throw FailedError()
153157
}
154158
try await testJobQueue(jobQueue) {
@@ -157,6 +161,7 @@ final class JobsTests: XCTestCase {
157161
await fulfillment(of: [expectation], timeout: 5)
158162
}
159163
XCTAssertEqual(failedJobCount.load(ordering: .relaxed), 1)
164+
XCTAssertEqual(attemptCounter.withLockedValue { $0 }, [1, 2, 3])
160165
}
161166

162167
/// Test retry policy that does different things based on the error passed to it

Tests/JobsTests/MetricsTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ final class MetricsTests: XCTestCase {
391391
struct TestParameter: JobParameters {
392392
static let jobName = "testFailedJobs"
393393
}
394-
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4)
394+
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 3)
395395
let failedJobCount = ManagedAtomic(0)
396396
struct FailedError: Error {}
397397
var logger = Logger(label: "JobsTests")

0 commit comments

Comments
 (0)