23
23
#include <yql/essentials/utils/yql_panic.h>
24
24
#include <ydb/core/formats/arrow/serializer/abstract.h>
25
25
26
+ #include <library/cpp/retry/retry_policy.h>
27
+
26
28
namespace NYql::NDq {
27
29
28
30
using namespace NActors;
@@ -61,6 +63,12 @@ namespace NYql::NDq {
61
63
public TGenericBaseActor<TGenericLookupActor> {
62
64
using TBase = TGenericBaseActor<TGenericLookupActor>;
63
65
66
+ using ILookupRetryPolicy = IRetryPolicy<const NYdbGrpc::TGrpcStatus&>;
67
+ using ILookupRetryState = ILookupRetryPolicy::IRetryState;
68
+
69
+ struct TEvLookupRetry : NActors::TEventLocal<TEvLookupRetry, EvRetry> {
70
+ };
71
+
64
72
public:
65
73
TGenericLookupActor(
66
74
NConnector::IClient::TPtr connectorClient,
@@ -87,6 +95,24 @@ namespace NYql::NDq {
87
95
, HolderFactory(holderFactory)
88
96
, ColumnDestinations(CreateColumnDestination())
89
97
, MaxKeysInRequest(maxKeysInRequest)
98
+ , RetryPolicy(
99
+ ILookupRetryPolicy::GetExponentialBackoffPolicy(
100
+ /* retryClassFunction */
101
+ [](const NYdbGrpc::TGrpcStatus& status) {
102
+ if (NConnector::GrpcStatusNeedsRetry(status)) {
103
+ return ERetryErrorClass::ShortRetry;
104
+ }
105
+ if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
106
+ return ERetryErrorClass::ShortRetry; // TODO LongRetry?
107
+ }
108
+ return ERetryErrorClass::NoRetry;
109
+ },
110
+ /* minDelay */ TDuration::MilliSeconds(1),
111
+ /* minLongRetryDelay */ TDuration::MilliSeconds(500),
112
+ /* maxDelay */ TDuration::Seconds(1),
113
+ /* maxRetries */ RequestRetriesLimit,
114
+ /* maxTime */ TDuration::Minutes(5),
115
+ /* scaleFactor */ 2))
90
116
{
91
117
InitMonCounters(taskCounters);
92
118
}
@@ -157,7 +183,7 @@ namespace NYql::NDq {
157
183
hFunc(TEvReadSplitsPart, Handle);
158
184
hFunc(TEvReadSplitsFinished, Handle);
159
185
hFunc(TEvError, Handle);
160
- hFunc(TEvRetry , Handle);
186
+ hFunc(TEvLookupRetry , Handle);
161
187
hFunc(NActors::TEvents::TEvPoison, Handle);)
162
188
163
189
void Handle(TEvListSplitsIterator::TPtr ev) {
@@ -166,7 +192,7 @@ namespace NYql::NDq {
166
192
[
167
193
actorSystem = TActivationContext::ActorSystem(),
168
194
selfId = SelfId(),
169
- retriesRemaining = RetriesRemaining
195
+ retryState = RetryState
170
196
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
171
197
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector";
172
198
auto result = ExtractFromConstFuture(asyncResult);
@@ -175,7 +201,7 @@ namespace NYql::NDq {
175
201
auto ev = new TEvListSplitsPart(std::move(*result.Response));
176
202
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
177
203
} else {
178
- SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining );
204
+ SendRetryOrError(actorSystem, selfId, result.Status, retryState );
179
205
}
180
206
});
181
207
}
@@ -199,15 +225,15 @@ namespace NYql::NDq {
199
225
Connector->ReadSplits(readRequest, RequestTimeout).Subscribe([
200
226
actorSystem = TActivationContext::ActorSystem(),
201
227
selfId = SelfId(),
202
- retriesRemaining = RetriesRemaining
228
+ retryState = RetryState
203
229
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
204
230
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
205
231
auto result = ExtractFromConstFuture(asyncResult);
206
232
if (result.Status.Ok()) {
207
233
auto ev = new TEvReadSplitsIterator(std::move(result.Iterator));
208
234
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
209
235
} else {
210
- SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining );
236
+ SendRetryOrError(actorSystem, selfId, result.Status, retryState );
211
237
}
212
238
});
213
239
}
@@ -236,9 +262,8 @@ namespace NYql::NDq {
236
262
actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release()));
237
263
}
238
264
239
- void Handle(TEvRetry ::TPtr ev ) {
265
+ void Handle(TEvLookupRetry ::TPtr) {
240
266
auto guard = Guard(*Alloc);
241
- RetriesRemaining = ev->Get()->NextRetries;
242
267
SendRequest();
243
268
}
244
269
@@ -270,7 +295,7 @@ namespace NYql::NDq {
270
295
}
271
296
272
297
Request = std::move(request);
273
- RetriesRemaining = RequestRetriesLimit ;
298
+ RetryState = std::shared_ptr<ILookupRetryState>(RetryPolicy->CreateRetryState()) ;
274
299
SendRequest();
275
300
}
276
301
@@ -288,7 +313,7 @@ namespace NYql::NDq {
288
313
Connector->ListSplits(splitRequest, RequestTimeout).Subscribe([
289
314
actorSystem = TActivationContext::ActorSystem(),
290
315
selfId = SelfId(),
291
- retriesRemaining = RetriesRemaining
316
+ retryState = RetryState
292
317
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
293
318
auto result = ExtractFromConstFuture(asyncResult);
294
319
if (result.Status.Ok()) {
@@ -297,7 +322,7 @@ namespace NYql::NDq {
297
322
auto ev = new TEvListSplitsIterator(std::move(result.Iterator));
298
323
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
299
324
} else {
300
- SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining );
325
+ SendRetryOrError(actorSystem, selfId, result.Status, retryState );
301
326
}
302
327
});
303
328
if (CpuTime) {
@@ -310,7 +335,7 @@ namespace NYql::NDq {
310
335
[
311
336
actorSystem = TActivationContext::ActorSystem(),
312
337
selfId = SelfId(),
313
- retriesRemaining = RetriesRemaining
338
+ retryState = RetryState
314
339
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
315
340
auto result = ExtractFromConstFuture(asyncResult);
316
341
if (result.Status.Ok()) {
@@ -329,7 +354,7 @@ namespace NYql::NDq {
329
354
auto ev = new TEvReadSplitsFinished(std::move(result.Status));
330
355
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
331
356
} else {
332
- SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining );
357
+ SendRetryOrError(actorSystem, selfId, result.Status, retryState );
333
358
}
334
359
});
335
360
}
@@ -395,22 +420,12 @@ namespace NYql::NDq {
395
420
new TEvError(std::move(error)));
396
421
}
397
422
398
- static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
399
- if (NConnector::GrpcStatusNeedsRetry(status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
400
- if (retriesRemaining) {
401
- const auto retry = RequestRetriesLimit - retriesRemaining;
402
- const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s
403
- // << TODO tune/tweak
404
- YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
405
- --retriesRemaining;
406
- if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
407
- // if error was deadline, retry only once
408
- retriesRemaining = 0; // TODO tune/tweak
409
- }
410
- actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry(retriesRemaining)));
411
- return;
412
- }
413
- YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit;
423
+ static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, std::shared_ptr<ILookupRetryState> retryState) {
424
+ auto nextRetry = retryState->GetNextRetryDelay(status);
425
+ if (nextRetry) {
426
+ YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry scheduled in " << *nextRetry;
427
+ actorSystem->Schedule(*nextRetry, new IEventHandle(selfId, selfId, new TEvLookupRetry()));
428
+ return;
414
429
}
415
430
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
416
431
}
@@ -510,7 +525,8 @@ namespace NYql::NDq {
510
525
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
511
526
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
512
527
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
513
- ui32 RetriesRemaining;
528
+ ILookupRetryPolicy::TPtr RetryPolicy;
529
+ std::shared_ptr<ILookupRetryState> RetryState;
514
530
::NMonitoring::TDynamicCounters::TCounterPtr Count;
515
531
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
516
532
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;
0 commit comments