@@ -352,7 +352,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
352
352
clusterState.ReadSession .reset ();
353
353
}
354
354
}
355
- ReadyBuffer = std::queue<TReadyBatch>{}; // clear read buffer
355
+ Reconnected = true ;
356
356
Metrics.ReconnectRate ->Inc ();
357
357
358
358
Schedule (ReconnectPeriod, new TEvPrivate::TEvReconnectSession ());
@@ -513,6 +513,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
513
513
}
514
514
515
515
i64 GetAsyncInputData (NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool &, i64 freeSpace) override {
516
+ // called with bound allocator
516
517
Metrics.InFlyAsyncInputData ->Set (0 );
517
518
SRC_LOG_T (" SessionId: " << GetSessionId () << " GetAsyncInputData freeSpace = " << freeSpace);
518
519
@@ -525,6 +526,11 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
525
526
InflightReconnect = true ;
526
527
}
527
528
529
+ if (Reconnected) {
530
+ Reconnected = false ;
531
+ ReadyBuffer = std::queue<TReadyBatch>{}; // clear read buffer
532
+ }
533
+
528
534
i64 usedSpace = 0 ;
529
535
if (MaybeReturnReadyBatch (buffer, watermark, usedSpace)) {
530
536
return usedSpace;
@@ -673,6 +679,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
673
679
THashMap<NYdb::NTopic::TPartitionSession::TPtr, std::pair<std::string, TList<std::pair<ui64, ui64>>>> OffsetRanges; // [start, end)
674
680
};
675
681
682
+ // must be called with bound allocator
676
683
bool MaybeReturnReadyBatch (NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, i64 & usedSpace) {
677
684
if (ReadyBuffer.empty ()) {
678
685
SubscribeOnNextEvent ();
@@ -709,6 +716,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
709
716
return true ;
710
717
}
711
718
719
+ // must be called with bound allocator
712
720
void PushWatermarkToReady (TInstant watermark) {
713
721
SRC_LOG_D (" SessionId: " << GetSessionId () << " New watermark " << watermark << " was generated" );
714
722
@@ -720,6 +728,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
720
728
ReadyBuffer.back ().Watermark = watermark;
721
729
}
722
730
731
+ // must be called (visited) with bound allocator
723
732
struct TTopicEventProcessor {
724
733
void operator ()(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) {
725
734
const auto partitionKey = MakePartitionKey (Cluster, event.GetPartitionSession ());
@@ -853,6 +862,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
853
862
private:
854
863
bool InflightReconnect = false ;
855
864
TDuration ReconnectPeriod;
865
+ bool Reconnected = false ;
856
866
TMetrics Metrics;
857
867
const i64 BufferSize;
858
868
const THolderFactory& HolderFactory;
0 commit comments