@@ -234,6 +234,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
234
234
std::vector<std::optional<ui64>> ColumnIndexes; // Output column index in schema passed into RowDispatcher
235
235
const TType* InputDataType = nullptr ; // Multi type (comes from Row Dispatcher)
236
236
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true >> DataUnpacker;
237
+ // Set on Parent
237
238
ui64 CpuMicrosec = 0 ;
238
239
// Set on both Parent (cumulative) and Childern (separate)
239
240
@@ -377,10 +378,47 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
377
378
hFunc (TEvPrivate::TEvRefreshClusters, Handle);
378
379
hFunc (TEvPrivate::TEvReceivedClusters, Handle);
379
380
hFunc (TEvPrivate::TEvDescribeTopicResult, Handle);
381
+ })
380
382
381
- cFunc (TEvents::TEvPoisonPill::EventType, PassAway);
383
+ STRICT_STFUNC(IgnoreState, {
384
+ // ignore all events except for retry queue
385
+ hFunc (NFq::TEvRowDispatcher::TEvCoordinatorChanged, IgnoreEvent);
386
+ hFunc (NFq::TEvRowDispatcher::TEvCoordinatorResult, ReplyNoSession);
387
+ hFunc (NFq::TEvRowDispatcher::TEvNewDataArrived, ReplyNoSession);
388
+ hFunc (NFq::TEvRowDispatcher::TEvMessageBatch, ReplyNoSession);
389
+ hFunc (NFq::TEvRowDispatcher::TEvStartSessionAck, ReplyNoSession);
390
+ hFunc (NFq::TEvRowDispatcher::TEvSessionError, ReplyNoSession);
391
+ hFunc (NFq::TEvRowDispatcher::TEvStatistics, ReplyNoSession);
392
+ hFunc (NFq::TEvRowDispatcher::TEvGetInternalStateRequest, ReplyNoSession);
393
+
394
+ hFunc (NActors::TEvents::TEvPong, Handle);
395
+ hFunc (TEvInterconnect::TEvNodeConnected, HandleConnected);
396
+ hFunc (TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
397
+ hFunc (NActors::TEvents::TEvUndelivered, Handle);
398
+ hFunc (NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
399
+ hFunc (NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
400
+
401
+ // ignore all row dispatcher events
402
+ hFunc (NFq::TEvRowDispatcher::TEvHeartbeat, ReplyNoSession);
403
+ hFunc (TEvPrivate::TEvPrintState, IgnoreEvent);
404
+ hFunc (TEvPrivate::TEvProcessState, IgnoreEvent);
405
+ hFunc (TEvPrivate::TEvNotifyCA, IgnoreEvent);
406
+ hFunc (TEvPrivate::TEvRefreshClusters, IgnoreEvent);
407
+ hFunc (TEvPrivate::TEvReceivedClusters, IgnoreEvent);
408
+ hFunc (TEvPrivate::TEvDescribeTopicResult, IgnoreEvent);
382
409
})
383
410
411
+ template <class TEventPtr>
412
+ void IgnoreEvent(TEventPtr& ev) {
413
+ SRC_LOG_D (" Ignore " << typeid (TEventPtr).name () << " from " << ev->Sender );
414
+ }
415
+
416
+ template <class TEventPtr >
417
+ void ReplyNoSession (TEventPtr& ev) {
418
+ SRC_LOG_D (" Ignore (no session) " << typeid (TEventPtr).name () << " from " << ev->Sender );
419
+ SendNoSession (ev->Sender , ev->Cookie );
420
+ }
421
+
384
422
static constexpr char ActorName[] = " DQ_PQ_READ_ACTOR" ;
385
423
386
424
void CommitState (const NDqProto::TCheckpoint& checkpoint) override ;
@@ -485,9 +523,13 @@ TDqPqRdReadActor::TDqPqRdReadActor(
485
523
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
486
524
, MaxBufferSize(bufferSize)
487
525
{
488
- if (Parent == this ) {
489
- State = EState::START_CLUSTER_DISCOVERY;
526
+
527
+ SRC_LOG_I (" Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString () << " , metadatafields: " << JoinSeq (' ,' , SourceParams.GetMetadataFields ())
528
+ << " , partitions: " << JoinSeq (' ,' , GetPartitionsToRead ()));
529
+ if (Parent != this ) {
530
+ return ;
490
531
}
532
+ State = EState::START_CLUSTER_DISCOVERY;
491
533
const auto programBuilder = std::make_unique<TProgramBuilder>(typeEnv, *holderFactory.GetFunctionRegistry ());
492
534
493
535
// Parse output schema (expected struct output type)
@@ -510,8 +552,6 @@ TDqPqRdReadActor::TDqPqRdReadActor(
510
552
DataUnpacker = std::make_unique<NKikimr::NMiniKQL::TValuePackerTransport<true >>(InputDataType);
511
553
512
554
IngressStats.Level = statsLevel;
513
- SRC_LOG_I (" Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString () << " , metadatafields: " << JoinSeq (' ,' , SourceParams.GetMetadataFields ())
514
- << " , partitions: " << JoinSeq (' ,' , GetPartitionsToRead ()));
515
555
}
516
556
517
557
void TDqPqRdReadActor::Init () {
@@ -663,15 +703,18 @@ void TDqPqRdReadActor::StopSession(TSession& sessionInfo) {
663
703
// IActor & IDqComputeActorAsyncInput
664
704
void TDqPqRdReadActor::PassAway () { // Is called from Compute Actor
665
705
SRC_LOG_I (" PassAway" );
706
+ Become (&TDqPqRdReadActor::IgnoreState);
666
707
PrintInternalState ();
667
708
for (auto & [rowDispatcherActorId, sessionInfo] : Sessions) {
668
709
StopSession (sessionInfo);
669
710
}
670
711
for (auto & clusterState : Clusters) {
671
- if (clusterState.Child == this ) {
712
+ auto child = clusterState.Child ;
713
+ if (child == this ) {
672
714
continue ;
673
715
}
674
- Send (clusterState.ChildId , new NActors::TEvents::TEvPoison);
716
+ // all actors are on same mailbox, safe to call
717
+ child->PassAway ();
675
718
}
676
719
Clusters.clear ();
677
720
FederatedTopicClient.Reset ();
@@ -681,6 +724,7 @@ void TDqPqRdReadActor::PassAway() { // Is called from Compute Actor
681
724
}
682
725
683
726
i64 TDqPqRdReadActor::GetAsyncInputData (NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& /* watermark*/ , bool &, i64 freeSpace) {
727
+ Counters.GetAsyncInputData ++;
684
728
SRC_LOG_T (" GetAsyncInputData freeSpace = " << freeSpace);
685
729
Init ();
686
730
Metrics.InFlyAsyncInputData ->Set (0 );
@@ -717,6 +761,7 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b
717
761
continue ;
718
762
}
719
763
for (auto & [rowDispatcherActorId, sessionInfo] : child->Sessions ) {
764
+ // all actors are on same mailbox, safe to call
720
765
child->TrySendGetNextBatch (sessionInfo);
721
766
}
722
767
}
@@ -769,6 +814,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
769
814
SRC_LOG_T (" Received TEvStatistics from " << ev->Sender << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " generation " << ev->Cookie );
770
815
Counters.Statistics ++;
771
816
CpuMicrosec += ev->Get ()->Record .GetCpuMicrosec ();
817
+ // all actors are on same mailbox, this method is not called after Parent stopped, safe to access directly
772
818
if (Parent != this ) {
773
819
Parent->CpuMicrosec += ev->Get ()->Record .GetCpuMicrosec ();
774
820
}
@@ -872,7 +918,7 @@ void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& e
872
918
873
919
void TDqPqRdReadActor::Handle (NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
874
920
SRC_LOG_D (" TEvCoordinatorChanged, new coordinator " << ev->Get ()->CoordinatorActorId );
875
- Counters.GetAsyncInputData ++;
921
+ Counters.CoordinatorChanged ++;
876
922
877
923
if (CoordinatorActorId
878
924
&& CoordinatorActorId == ev->Get ()->CoordinatorActorId ) {
@@ -900,6 +946,7 @@ void TDqPqRdReadActor::ScheduleProcessState() {
900
946
901
947
void TDqPqRdReadActor::ReInit (const TString& reason) {
902
948
SRC_LOG_I (" ReInit state, reason " << reason);
949
+ // all actors are on same mailbox, this method is not called after Parent stopped, safe to access directly
903
950
Parent->Metrics .ReInit ->Inc ();
904
951
905
952
State = EState::WAIT_COORDINATOR_ID;
@@ -915,7 +962,7 @@ void TDqPqRdReadActor::Stop(NDqProto::StatusIds::StatusCode status, TIssues issu
915
962
916
963
void TDqPqRdReadActor::Handle (NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& ev) {
917
964
SRC_LOG_I (" Received TEvCoordinatorResult from " << ev->Sender .ToString () << " , cookie " << ev->Cookie );
918
- Counters.CoordinatorChanged ++;
965
+ Counters.CoordinatorResult ++;
919
966
if (ev->Cookie != CoordinatorRequestCookie) {
920
967
SRC_LOG_W (" Ignore TEvCoordinatorResult. wrong cookie" );
921
968
return ;
@@ -991,6 +1038,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
991
1038
Stop (NDqProto::StatusIds::INTERNAL_ERROR, {TIssue (TStringBuilder () << LogPrefix << " No partition with id " << partitionId)});
992
1039
return ;
993
1040
}
1041
+ // all actors are on same mailbox, this method is not called after Parent stopped, safe to access directly
994
1042
Parent->Metrics .InFlyGetNextBatch ->Set (0 );
995
1043
if (ev->Get ()->Record .GetMessages ().empty ()) {
996
1044
return ;
@@ -1068,14 +1116,21 @@ void TDqPqRdReadActor::PrintInternalState() {
1068
1116
1069
1117
TString TDqPqRdReadActor::GetInternalState () {
1070
1118
TStringStream str;
1071
- str << LogPrefix << " State: used buffer size " << Parent->ReadyBufferSizeBytes << " ready buffer event size " << Parent->ReadyBuffer .size () << " state " << static_cast <ui64>(State) << " InFlyAsyncInputData " << Parent->InFlyAsyncInputData << " \n " ;
1072
- str << " Counters: GetAsyncInputData " << Counters.GetAsyncInputData << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult
1119
+ str << LogPrefix << " State:" ;
1120
+ str << " used buffer size " << Parent->ReadyBufferSizeBytes << " ready buffer event size " << Parent->ReadyBuffer .size ()
1121
+ << " state " << static_cast <ui64>(State)
1122
+ << " InFlyAsyncInputData " << Parent->InFlyAsyncInputData
1123
+ << " \n " ;
1124
+ str << " Counters:"
1125
+ << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult
1073
1126
<< " MessageBatch " << Counters.MessageBatch << " StartSessionAck " << Counters.StartSessionAck << " NewDataArrived " << Counters.NewDataArrived
1074
1127
<< " SessionError " << Counters.SessionError << " Statistics " << Counters.Statistics << " NodeDisconnected " << Counters.NodeDisconnected
1075
1128
<< " NodeConnected " << Counters.NodeConnected << " Undelivered " << Counters.Undelivered << " Retry " << Counters.Retry
1076
1129
<< " PrivateHeartbeat " << Counters.PrivateHeartbeat << " SessionClosed " << Counters.SessionClosed << " Pong " << Counters.Pong
1077
1130
<< " Heartbeat " << Counters.Heartbeat << " PrintState " << Counters.PrintState << " ProcessState " << Counters.ProcessState
1078
- << " NotifyCA " << Parent->Counters .NotifyCA << " \n " ;
1131
+ << " GetAsyncInputData " << Parent->Counters .GetAsyncInputData
1132
+ << " NotifyCA " << Parent->Counters .NotifyCA
1133
+ << " \n " ;
1079
1134
1080
1135
for (auto & [rowDispatcherActorId, sessionInfo] : Sessions) {
1081
1136
str << " " << rowDispatcherActorId << " status " << static_cast <ui64>(sessionInfo.Status )
@@ -1365,6 +1420,7 @@ void TDqPqRdReadActor::StartCluster(ui32 clusterIndex) {
1365
1420
PqGateway,
1366
1421
this ,
1367
1422
TString (Clusters[clusterIndex].Info .Name ));
1423
+ Clusters[clusterIndex].Child = actor;
1368
1424
Clusters[clusterIndex].ChildId = RegisterWithSameMailbox (actor);
1369
1425
actor->Init ();
1370
1426
actor->InitChild ();
0 commit comments