@@ -247,6 +247,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
247
247
void TrySendGetNextBatch (SessionInfo& sessionInfo);
248
248
template <class TEventPtr >
249
249
bool CheckSession (SessionInfo& session, const TEventPtr& ev, ui64 partitionId);
250
+ void SendStopSession (const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie);
250
251
};
251
252
252
253
TDqPqRdReadActor::TDqPqRdReadActor (
@@ -308,12 +309,14 @@ void TDqPqRdReadActor::ProcessState() {
308
309
}
309
310
State = EState::WAIT_PARTITIONS_ADDRES;
310
311
auto partitionToRead = GetPartitionsToRead ();
311
- SRC_LOG_I (" Send TEvCoordinatorRequest to coordinator " << CoordinatorActorId->ToString () << " , partIds: " << JoinSeq (" , " , partitionToRead));
312
+ auto cookie = ++CoordinatorRequestCookie;
313
+ SRC_LOG_I (" Send TEvCoordinatorRequest to coordinator " << CoordinatorActorId->ToString () << " , partIds: "
314
+ << JoinSeq (" , " , partitionToRead) << " cookie " << cookie);
312
315
Send (
313
316
*CoordinatorActorId,
314
317
new NFq::TEvRowDispatcher::TEvCoordinatorRequest (SourceParams, partitionToRead),
315
318
IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession,
316
- ++CoordinatorRequestCookie );
319
+ cookie );
317
320
return ;
318
321
}
319
322
case EState::WAIT_PARTITIONS_ADDRES:
@@ -327,7 +330,7 @@ void TDqPqRdReadActor::ProcessState() {
327
330
TPartitionKey partitionKey{TString{}, partitionId};
328
331
const auto offsetIt = PartitionToOffset.find (partitionKey);
329
332
if (offsetIt != PartitionToOffset.end ()) {
330
- SRC_LOG_D (" readOffset found" );
333
+ SRC_LOG_D (" ReadOffset found" );
331
334
readOffset = offsetIt->second ;
332
335
}
333
336
@@ -439,8 +442,9 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& e
439
442
auto sessionIt = Sessions.find (partitionId);
440
443
if (sessionIt == Sessions.end ()) {
441
444
SRC_LOG_W (" Ignore TEvStartSessionAck from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
442
- << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
445
+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId << " , cookie " << ev-> Cookie );
443
446
YQL_ENSURE (State != EState::STARTED);
447
+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
444
448
return ;
445
449
}
446
450
auto & sessionInfo = sessionIt->second ;
@@ -458,8 +462,9 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev)
458
462
auto sessionIt = Sessions.find (partitionId);
459
463
if (sessionIt == Sessions.end ()) {
460
464
SRC_LOG_W (" Ignore TEvSessionError from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
461
- << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
465
+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId << " , cookie " << ev-> Cookie );
462
466
YQL_ENSURE (State != EState::STARTED);
467
+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
463
468
return ;
464
469
}
465
470
@@ -472,14 +477,15 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev)
472
477
473
478
void TDqPqRdReadActor::Handle (NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
474
479
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get ()->Record .GetTransportMeta ();
475
- SRC_LOG_D (" TEvStatistics from " << ev->Sender << " , offset " << ev->Get ()->Record .GetNextMessageOffset () << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo ());
480
+ SRC_LOG_T (" TEvStatistics from " << ev->Sender << " , offset " << ev->Get ()->Record .GetNextMessageOffset () << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo ());
476
481
477
482
ui64 partitionId = ev->Get ()->Record .GetPartitionId ();
478
483
auto sessionIt = Sessions.find (partitionId);
479
484
if (sessionIt == Sessions.end ()) {
480
485
SRC_LOG_W (" Ignore TEvStatistics from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
481
486
<< " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
482
487
YQL_ENSURE (State != EState::STARTED);
488
+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
483
489
return ;
484
490
}
485
491
auto & sessionInfo = sessionIt->second ;
@@ -505,6 +511,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
505
511
SRC_LOG_W (" Ignore TEvNewDataArrived from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
506
512
<< " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
507
513
YQL_ENSURE (State != EState::STARTED);
514
+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
508
515
return ;
509
516
}
510
517
@@ -552,7 +559,8 @@ void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& e
552
559
auto sessionIt = Sessions.find (partitionId);
553
560
if (sessionIt == Sessions.end ()) {
554
561
SRC_LOG_W (" Ignore TEvHeartbeat from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
555
- << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
562
+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId << " , cookie " << ev->Cookie );
563
+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
556
564
return ;
557
565
}
558
566
CheckSession (sessionIt->second , ev, partitionId);
@@ -596,21 +604,23 @@ void TDqPqRdReadActor::Stop(const TString& message) {
596
604
}
597
605
598
606
void TDqPqRdReadActor::Handle (NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& ev) {
599
- SRC_LOG_D (" TEvCoordinatorResult from " << ev->Sender .ToString () << " , cookie " << ev->Cookie );
607
+ SRC_LOG_I (" TEvCoordinatorResult from " << ev->Sender .ToString () << " , cookie " << ev->Cookie );
600
608
if (ev->Cookie != CoordinatorRequestCookie) {
601
609
SRC_LOG_W (" Ignore TEvCoordinatorResult. wrong cookie" );
602
610
return ;
603
611
}
612
+ if (State != EState::WAIT_PARTITIONS_ADDRES) {
613
+ SRC_LOG_W (" Ignore TEvCoordinatorResult. wrong state " << static_cast <ui64>(EState::WAIT_PARTITIONS_ADDRES));
614
+ return ;
615
+ }
604
616
for (auto & p : ev->Get ()->Record .GetPartitions ()) {
605
617
TActorId rowDispatcherActorId = ActorIdFromProto (p.GetActorId ());
606
- SRC_LOG_D (" rowDispatcherActorId:" << rowDispatcherActorId);
607
-
608
618
for (auto partitionId : p.GetPartitionId ()) {
609
- SRC_LOG_D (" partitionId:" << partitionId);
610
619
if (Sessions.contains (partitionId)) {
611
620
Stop (" Internal error: session already exists" );
612
621
return ;
613
622
}
623
+ SRC_LOG_I (" Create session to RD (" << rowDispatcherActorId << " ), partitionId " << partitionId);
614
624
Sessions.emplace (
615
625
std::piecewise_construct,
616
626
std::forward_as_tuple (partitionId),
@@ -655,8 +665,9 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
655
665
auto sessionIt = Sessions.find (partitionId);
656
666
if (sessionIt == Sessions.end ()) {
657
667
SRC_LOG_W (" Ignore TEvMessageBatch from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
658
- << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
668
+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId << " , cookie " << ev-> Cookie );
659
669
YQL_ENSURE (State != EState::STARTED);
670
+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
660
671
return ;
661
672
}
662
673
@@ -705,6 +716,12 @@ std::pair<NUdf::TUnboxedValuePod, i64> TDqPqRdReadActor::CreateItem(const TStrin
705
716
}
706
717
707
718
void TDqPqRdReadActor::Handle (const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) {
719
+ if (State != EState::STARTED) {
720
+ if (!Sessions.empty ()) {
721
+ Stop (TStringBuilder () << " Internal error: wrong state on TEvSessionClosed, session size " << Sessions.size () << " state " << static_cast <ui64>(State));
722
+ }
723
+ return ;
724
+ }
708
725
ReInit (TStringBuilder () << " Session closed, event queue id " << ev->Get ()->EventQueueId );
709
726
}
710
727
@@ -754,10 +771,7 @@ template <class TEventPtr>
754
771
bool TDqPqRdReadActor::CheckSession (SessionInfo& session, const TEventPtr& ev, ui64 partitionId) {
755
772
if (ev->Cookie != session.Generation ) {
756
773
SRC_LOG_W (" Wrong message generation (" << typeid (TEventPtr).name () << " ), sender " << ev->Sender << " cookie " << ev->Cookie << " , session generation " << session.Generation << " , send TEvStopSession" );
757
- auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
758
- *event->Record .MutableSource () = SourceParams;
759
- event->Record .SetPartitionId (partitionId);
760
- Send (ev->Sender , event.release (), 0 , ev->Cookie );
774
+ SendStopSession (ev->Sender , partitionId, ev->Cookie );
761
775
return false ;
762
776
}
763
777
if (!session.EventsQueue .OnEventReceived (ev)) {
@@ -768,6 +782,13 @@ bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, u
768
782
return true ;
769
783
}
770
784
785
+ void TDqPqRdReadActor::SendStopSession (const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) {
786
+ auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
787
+ *event->Record .MutableSource () = SourceParams;
788
+ event->Record .SetPartitionId (partitionId);
789
+ Send (recipient, event.release (), 0 , cookie);
790
+ }
791
+
771
792
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor (
772
793
NPq::NProto::TDqPqTopicSource&& settings,
773
794
ui64 inputIndex,
0 commit comments