1
1
#include " dq_solomon_read_actor.h"
2
2
#include " dq_solomon_actors_util.h"
3
3
4
+ #include < library/cpp/json/json_reader.h>
4
5
#include < library/cpp/protobuf/util/pb_io.h>
6
+ #include < library/cpp/retry/retry.h>
5
7
6
8
#include < util/string/join.h>
7
9
#include < ydb/library/yql/dq/actors/common/retry_queue.h>
34
36
#include < ydb/library/actors/core/hfunc.h>
35
37
#include < ydb/library/actors/core/log.h>
36
38
#include < ydb/library/actors/http/http_proxy.h>
37
- #include < library/cpp/json/json_reader.h>
38
39
39
40
40
41
#include < util/generic/algorithm.h>
@@ -67,13 +68,6 @@ using namespace NKikimr::NMiniKQL;
67
68
namespace {
68
69
69
70
class TDqSolomonReadActor : public NActors ::TActorBootstrapped<TDqSolomonReadActor>, public IDqComputeActorAsyncInput {
70
- private:
71
- struct TMetricTimeRange {
72
- NSo::TMetric Metric;
73
- TInstant From;
74
- TInstant To;
75
- };
76
-
77
71
public:
78
72
static constexpr char ActorName[] = " DQ_SOLOMON_READ_ACTOR" ;
79
73
@@ -109,6 +103,19 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
109
103
SOURCE_LOG_D (" Init" );
110
104
IngressStats.Level = statsLevel;
111
105
106
+ RetryPolicy = IRetryPolicy<NSo::TGetDataResponse>::GetExponentialBackoffPolicy (
107
+ [](const NSo::TGetDataResponse& response) {
108
+ if (response.Status == NSo::EStatus::STATUS_RETRIABLE_ERROR) {
109
+ return ERetryErrorClass::ShortRetry;
110
+ }
111
+ return ERetryErrorClass::NoRetry;
112
+ },
113
+ TDuration::MilliSeconds (25 ),
114
+ TDuration::MilliSeconds (200 ),
115
+ TDuration::MilliSeconds (500 ),
116
+ 5
117
+ );
118
+
112
119
UseMetricsQueue = !ReadParams.Source .HasProgram ();
113
120
114
121
auto stringType = ProgramBuilder.NewDataType (NYql::NUdf::TDataType<char *>::Id);
@@ -140,6 +147,15 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
140
147
RequestMetrics ();
141
148
} else {
142
149
Become (&TDqSolomonReadActor::LimitedModeState);
150
+
151
+ TMetricTimeRange metric {
152
+ {},
153
+ ReadParams.Source .GetProgram (),
154
+ TInstant::Seconds (ReadParams.Source .GetFrom ()),
155
+ TInstant::Seconds (ReadParams.Source .GetTo ())
156
+ };
157
+
158
+ MetricsWithTimeRange.push_back (metric);
143
159
RequestData ();
144
160
}
145
161
}
@@ -149,6 +165,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
149
165
hFunc (TEvSolomonProvider::TEvMetricsReadError, HandleMetricsReadError);
150
166
hFunc (TEvSolomonProvider::TEvPointsCountBatch, HandlePointsCountBatch);
151
167
hFunc (TEvSolomonProvider::TEvNewDataBatch, HandleNewDataBatch);
168
+ hFunc (TEvSolomonProvider::TEvRetryDataRequest, HandleRetryDataRequest);
152
169
hFunc (TEvSolomonProvider::TEvAck, Handle);
153
170
hFunc (NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
154
171
hFunc (NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
@@ -222,35 +239,45 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
222
239
auto & metric = batch.Metric ;
223
240
auto & pointsCount = batch.Response .Result .PointsCount ;
224
241
ParsePointsCount (metric, pointsCount);
242
+ CompletedMetricsCount++;
225
243
226
244
TryRequestData ();
227
245
}
228
246
229
247
void HandleNewDataBatch (TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
230
- auto & batch = *newDataBatch->Get ();
231
-
232
- if (batch.Response .Status == NSo::EStatus::STATUS_FATAL_ERROR) {
233
- TIssues issues { TIssue (batch.Response .Error ) };
234
- SOURCE_LOG_W (" Got " << " error data response[" << newDataBatch->Cookie << " ] from solomon: " << issues.ToOneLineString ());
235
- Send (ComputeActorId, new TEvAsyncInputError (InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
248
+ if (!SaveDataBatch (newDataBatch)) {
236
249
return ;
237
250
}
238
- if (batch.Response .Status == NSo::EStatus::STATUS_RETRIABLE_ERROR) {
239
- MetricsWithTimeRange.emplace_back (batch.Metric , batch.From , batch.To );
240
- TryRequestData ();
241
- return ;
242
- }
243
-
244
- MetricsData.insert (MetricsData.end (), batch.Response .Result .Timeseries .begin (), batch.Response .Result .Timeseries .end ());
245
- CompletedMetricsCount++;
246
251
247
252
if (!MetricsWithTimeRange.empty ()) {
248
253
TryRequestData ();
249
- } else if (MetricsData.size () >= ComputeActorBatchSize || LastMetricProcessed ()) {
254
+ }
255
+ if (MetricsData.size () >= ComputeActorBatchSize || LastMetricProcessed ()) {
250
256
NotifyComputeActorWithData ();
251
257
}
252
258
}
253
259
260
+ void HandleRetryDataRequest (TEvSolomonProvider::TEvRetryDataRequest::TPtr& retryDataRequest) {
261
+ auto & retryDataEvent = *retryDataRequest->Get ();
262
+ NThreading::TFuture<NSo::TGetDataResponse> dataRequestFuture;
263
+
264
+ auto request = std::move (retryDataEvent.Request );
265
+ if (UseMetricsQueue) {
266
+ dataRequestFuture = SolomonClient->GetData (request.Selectors , request.From , request.To );
267
+ } else {
268
+ dataRequestFuture = SolomonClient->GetData (request.Program , request.From , request.To );
269
+ }
270
+
271
+ dataRequestFuture.Subscribe ([request = std::move (request), actorSystem = TActivationContext::ActorSystem (), selfId = SelfId ()](
272
+ NThreading::TFuture<NSo::TGetDataResponse> response) mutable -> void
273
+ {
274
+ actorSystem->Send (selfId, new TEvSolomonProvider::TEvNewDataBatch (
275
+ response.ExtractValue (),
276
+ std::move (request)
277
+ ));
278
+ });
279
+ }
280
+
254
281
void Handle (TEvSolomonProvider::TEvAck::TPtr& ev) {
255
282
MetricsQueueEvents.OnEventReceived (ev);
256
283
}
@@ -279,18 +306,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
279
306
}
280
307
281
308
void HandleNewDataBatchLimited (TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
282
- auto & batch = *newDataBatch->Get ();
283
-
284
- if (batch.Response .Status != NSo::EStatus::STATUS_OK) {
285
- TIssues issues { TIssue (batch.Response .Error ) };
286
- SOURCE_LOG_W (" Got " << " error data response[" << newDataBatch->Cookie << " ] from solomon: " << issues.ToOneLineString ());
287
- Send (ComputeActorId, new TEvAsyncInputError (InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
309
+ if (!SaveDataBatch (newDataBatch)) {
288
310
return ;
289
311
}
290
312
291
- MetricsData.insert (MetricsData.end (), batch.Response .Result .Timeseries .begin (), batch.Response .Result .Timeseries .end ());
292
- CompletedMetricsCount++;
293
-
294
313
NotifyComputeActorWithData ();
295
314
}
296
315
@@ -384,13 +403,13 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
384
403
385
404
bool LastMetricProcessed () const {
386
405
if (UseMetricsQueue) {
387
- return IsMetricsQueueEmpty && CompletedMetricsCount == ListedMetricsCount;
406
+ return IsMetricsQueueEmpty && CompletedMetricsCount == ListedMetricsCount && CompletedTimeRanges == ListedTimeRanges ;
388
407
}
389
- return CompletedMetricsCount == 1 ;
408
+ return CompletedTimeRanges == 1 ;
390
409
}
391
410
392
411
void TryRequestMetrics () {
393
- if (ListedMetrics.size () < 1000 && !IsMetricsQueueEmpty && !IsWaitingMetricsQueueResponse) {
412
+ if (ListedMetrics.empty () && !IsMetricsQueueEmpty && !IsWaitingMetricsQueueResponse) {
394
413
RequestMetrics ();
395
414
}
396
415
}
@@ -436,38 +455,27 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
436
455
}
437
456
438
457
void RequestData () {
439
- NThreading::TFuture<NSo::TGetDataResponse> getDataFuture;
440
- NSo::TMetric metric;
441
- TInstant from;
442
- TInstant to;
458
+ YQL_ENSURE (RetryPolicy);
459
+ NThreading::TFuture<NSo::TGetDataResponse> dataRequestFuture;
443
460
444
- if (UseMetricsQueue) {
445
- auto request = MetricsWithTimeRange.back ();
446
- MetricsWithTimeRange.pop_back ();
447
-
448
- metric = request.Metric ;
449
- from = request.From ;
450
- to = request.To ;
461
+ auto request = MetricsWithTimeRange.back ();
462
+ MetricsWithTimeRange.pop_back ();
451
463
452
- getDataFuture = SolomonClient->GetData (metric.Labels , from, to);
464
+ if (UseMetricsQueue) {
465
+ dataRequestFuture = SolomonClient->GetData (request.Selectors , request.From , request.To );
453
466
} else {
454
- getDataFuture = SolomonClient->GetData (
455
- ReadParams.Source .GetProgram (),
456
- TInstant::Seconds (ReadParams.Source .GetFrom ()),
457
- TInstant::Seconds (ReadParams.Source .GetTo ())
458
- );
467
+ dataRequestFuture = SolomonClient->GetData (request.Program , request.From , request.To );
459
468
}
460
469
461
- NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem ();
462
- getDataFuture.Subscribe ([actorSystem, metric, from, to, selfId = SelfId ()](
463
- const NThreading::TFuture<NSo::TGetDataResponse>& response) -> void
470
+ PendingDataRequests_[request] = RetryPolicy->CreateRetryState ();
471
+
472
+ dataRequestFuture.Subscribe ([request = std::move (request), actorSystem = TActivationContext::ActorSystem (), selfId = SelfId ()](
473
+ NThreading::TFuture<NSo::TGetDataResponse> response) mutable -> void
464
474
{
465
475
actorSystem->Send (selfId, new TEvSolomonProvider::TEvNewDataBatch (
466
- metric,
467
- from,
468
- to,
469
- response.GetValue ())
470
- );
476
+ response.ExtractValue (),
477
+ std::move (request)
478
+ ));
471
479
});
472
480
}
473
481
@@ -477,14 +485,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
477
485
478
486
auto ranges = SplitTimeIntervalIntoRanges (from, to, pointsCount);
479
487
480
- if (ranges.empty ()) {
481
- CompletedMetricsCount++;
482
- return ;
483
- }
484
-
485
488
for (const auto & [fromRange, toRange] : ranges) {
486
- MetricsWithTimeRange.emplace_back (metric, fromRange, toRange);
489
+ MetricsWithTimeRange.emplace_back (metric. Labels , " " , fromRange, toRange);
487
490
}
491
+ ListedTimeRanges += ranges.size ();
488
492
}
489
493
490
494
std::vector<std::pair<TInstant, TInstant>> SplitTimeIntervalIntoRanges (TInstant from, TInstant to, ui64 pointsCount) const {
@@ -507,6 +511,37 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
507
511
return result;
508
512
}
509
513
514
+ bool SaveDataBatch (TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
515
+ auto & batch = *newDataBatch->Get ();
516
+ auto request = batch.Request ;
517
+
518
+ if (batch.Response .Status == NSo::EStatus::STATUS_RETRIABLE_ERROR) {
519
+ if (auto delay = PendingDataRequests_[request]->GetNextRetryDelay (batch.Response )) {
520
+ SOURCE_LOG_D (" HandleNewDataBatch: retrying data request, delay: " << delay->MilliSeconds ());
521
+ Schedule (*delay, new TEvSolomonProvider::TEvRetryDataRequest (std::move (request)));
522
+ return false ;
523
+ }
524
+ }
525
+
526
+ PendingDataRequests_.erase (request);
527
+
528
+ if (batch.Response .Status != NSo::EStatus::STATUS_OK) {
529
+ TIssues issues { TIssue (batch.Response .Error ) };
530
+ SOURCE_LOG_W (" Got " << " error data response[" << newDataBatch->Cookie << " ] from solomon: " << issues.ToOneLineString ());
531
+ Send (ComputeActorId, new TEvAsyncInputError (InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
532
+ return false ;
533
+ }
534
+
535
+ MetricsData.insert (
536
+ MetricsData.end (),
537
+ std::make_move_iterator (batch.Response .Result .Timeseries .begin ()),
538
+ std::make_move_iterator (batch.Response .Result .Timeseries .end ())
539
+ );
540
+ CompletedTimeRanges++;
541
+
542
+ return true ;
543
+ }
544
+
510
545
private:
511
546
const ui64 InputIndex;
512
547
TDqAsyncStats IngressStats;
@@ -518,6 +553,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
518
553
const TDqSolomonReadParams ReadParams;
519
554
const ui64 ComputeActorBatchSize;
520
555
const ui64 MetricsQueueConsumersCountDelta;
556
+ IRetryPolicy<NSo::TGetDataResponse>::TPtr RetryPolicy;
521
557
522
558
bool UseMetricsQueue;
523
559
TRetryEventsQueue MetricsQueueEvents;
@@ -526,11 +562,14 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
526
562
bool IsMetricsQueueEmpty = false ;
527
563
bool IsConfirmedMetricsQueueFinish = false ;
528
564
565
+ std::map<TMetricTimeRange, IRetryPolicy<NSo::TGetDataResponse>::IRetryState::TPtr> PendingDataRequests_;
529
566
std::deque<NSo::TMetric> ListedMetrics;
530
567
std::deque<TMetricTimeRange> MetricsWithTimeRange;
531
568
std::deque<NSo::TTimeseries> MetricsData;
532
- size_t ListedMetricsCount = 0 ;
533
- size_t CompletedMetricsCount = 0 ;
569
+ ui64 ListedMetricsCount = 0 ;
570
+ ui64 CompletedMetricsCount = 0 ;
571
+ ui64 ListedTimeRanges = 0 ;
572
+ ui64 CompletedTimeRanges = 0 ;
534
573
const ui64 MaxPointsPerOneRequest = 10000 ;
535
574
536
575
TString SourceId;
0 commit comments