@@ -56,8 +56,7 @@ class SparkConnectClientRetriesSuite
56
56
private def createTestExceptionWithDetails (
57
57
msg : String ,
58
58
code : Status .Code = Status .Code .INTERNAL ,
59
- retryDelay : FiniteDuration = FiniteDuration (0 , " s" )
60
- ): StatusRuntimeException = {
59
+ retryDelay : FiniteDuration = FiniteDuration (0 , " s" )): StatusRuntimeException = {
61
60
// In grpc-java, RetryDelay should be specified as seconds: Long + nanos: Int
62
61
val seconds = retryDelay.toSeconds
63
62
val nanos = (retryDelay - FiniteDuration (seconds, " s" )).toNanos.toInt
@@ -83,8 +82,7 @@ class SparkConnectClientRetriesSuite
83
82
private def assertLongSequencesAlmostEqual (
84
83
first : Seq [Long ],
85
84
second : Seq [Long ],
86
- delta : Long
87
- ): Unit = {
85
+ delta : Long ): Unit = {
88
86
assert(first.length == second.length, " Lists have different lengths." )
89
87
for ((a, b) <- first.zip(second)) {
90
88
assert(math.abs(a - b) <= delta, s " Elements $a and $b differ by more than $delta. " )
@@ -199,10 +197,8 @@ class SparkConnectClientRetriesSuite
199
197
}
200
198
test(" DefaultPolicy retries exceptions with RetryInfo" ) {
201
199
// Error contains RetryInfo with retry_delay set to 0
202
- val dummyFn = new DummyFn (
203
- createTestExceptionWithDetails(msg = " Some error message" ),
204
- numFails = 100
205
- )
200
+ val dummyFn =
201
+ new DummyFn (createTestExceptionWithDetails(msg = " Some error message" ), numFails = 100 )
206
202
val retryPolicies = RetryPolicy .defaultPolicies()
207
203
val retryHandler = new GrpcRetryHandler (retryPolicies, sleep = _ => {})
208
204
assertThrows[RetriesExceeded ] {
@@ -218,12 +214,8 @@ class SparkConnectClientRetriesSuite
218
214
val st = new SleepTimeTracker ()
219
215
val retryDelay = FiniteDuration (5 , " min" )
220
216
val dummyFn = new DummyFn (
221
- createTestExceptionWithDetails(
222
- msg = " Some error message" ,
223
- retryDelay = retryDelay
224
- ),
225
- numFails = 100
226
- )
217
+ createTestExceptionWithDetails(msg = " Some error message" , retryDelay = retryDelay),
218
+ numFails = 100 )
227
219
val retryPolicies = RetryPolicy .defaultPolicies()
228
220
val retryHandler = new GrpcRetryHandler (retryPolicies, sleep = st.sleep)
229
221
@@ -243,12 +235,8 @@ class SparkConnectClientRetriesSuite
243
235
val st = new SleepTimeTracker ()
244
236
val retryDelay = FiniteDuration (5 , " d" )
245
237
val dummyFn = new DummyFn (
246
- createTestExceptionWithDetails(
247
- msg = " Some error message" ,
248
- retryDelay = retryDelay
249
- ),
250
- numFails = 100
251
- )
238
+ createTestExceptionWithDetails(msg = " Some error message" , retryDelay = retryDelay),
239
+ numFails = 100 )
252
240
val retryPolicies = RetryPolicy .defaultPolicies()
253
241
val retryHandler = new GrpcRetryHandler (retryPolicies, sleep = st.sleep)
254
242
@@ -271,14 +259,10 @@ class SparkConnectClientRetriesSuite
271
259
List .fill(2 )(
272
260
createTestExceptionWithDetails(
273
261
msg = " Some error message" ,
274
- retryDelay = retryDelay
275
- )
276
- ) ++ List .fill(3 )(
262
+ retryDelay = retryDelay)) ++ List .fill(3 )(
277
263
createTestExceptionWithDetails(
278
264
msg = " Some error message" ,
279
- code = Status .Code .UNAVAILABLE
280
- )
281
- )
265
+ code = Status .Code .UNAVAILABLE ))
282
266
).iterator
283
267
284
268
retryHandler.retry({
@@ -290,9 +274,8 @@ class SparkConnectClientRetriesSuite
290
274
291
275
// Should be retried by DefaultPolicy
292
276
val policy = retryPolicies.find(_.name == " DefaultPolicy" ).get
293
- val expectedSleeps = List .fill(2 )(retryDelay.toMillis) ++ List .tabulate(3 )(
294
- i => policy.initialBackoff.toMillis * math.pow(policy.backoffMultiplier, i + 2 ).toLong
295
- )
277
+ val expectedSleeps = List .fill(2 )(retryDelay.toMillis) ++ List .tabulate(3 )(i =>
278
+ policy.initialBackoff.toMillis * math.pow(policy.backoffMultiplier, i + 2 ).toLong)
296
279
assertLongSequencesAlmostEqual(st.times, expectedSleeps, delta = policy.jitter.toMillis)
297
280
}
298
281
}
0 commit comments