@@ -586,6 +586,36 @@ void TNodeBroker::PrepareEpochCache()
586
586
TabletCounters->Simple ()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set (EpochDeltasCache.size ());
587
587
}
588
588
589
+ void TNodeBroker::PrepareUpdateNodesLog ()
590
+ {
591
+ LOG_DEBUG_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
592
+ " Preparing update nodes log for epoch #" << Committed.Epoch .ToString ()
593
+ << " nodes=" << Committed.Nodes .size ()
594
+ << " expired=" << Committed.ExpiredNodes .size ()
595
+ << " removed=" << Committed.RemovedNodes .size ());
596
+
597
+ UpdateNodesLog.clear ();
598
+ UpdateNodesLogVersions.clear ();
599
+
600
+ TVector<TVersionedNodeID> nodeIdsSortedByVersion;
601
+ for (auto &entry : Committed.Nodes ) {
602
+ nodeIdsSortedByVersion.emplace_back (entry.second .NodeId , entry.second .Version );
603
+ }
604
+ for (auto &entry : Committed.ExpiredNodes ) {
605
+ nodeIdsSortedByVersion.emplace_back (entry.second .NodeId , entry.second .Version );
606
+ }
607
+ for (auto &entry : Committed.RemovedNodes ) {
608
+ nodeIdsSortedByVersion.emplace_back (entry.second .NodeId , entry.second .Version );
609
+ }
610
+ std::sort (nodeIdsSortedByVersion.begin (), nodeIdsSortedByVersion.end (), TVersionedNodeID::TCmpByVersion ());
611
+
612
+ for (const auto &id : nodeIdsSortedByVersion) {
613
+ const auto & node = *Committed.FindNode (id.NodeId );
614
+ AddNodeToUpdateNodesLog (node);
615
+ }
616
+ TabletCounters->Simple ()[COUNTER_UPDATE_NODES_LOG_SIZE_BYTES].Set (UpdateNodesLog.size ());
617
+ }
618
+
589
619
void TNodeBroker::AddNodeToEpochCache (const TNodeInfo &node)
590
620
{
591
621
LOG_DEBUG_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
@@ -615,13 +645,132 @@ void TNodeBroker::AddDeltaToEpochDeltasCache(const TString &delta, ui64 version)
615
645
TabletCounters->Simple ()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set (EpochDeltasCache.size ());
616
646
}
617
647
648
+ void TNodeBroker::AddNodeToUpdateNodesLog (const TNodeInfo &node)
649
+ {
650
+ LOG_DEBUG_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
651
+ " Add node " << node.IdShortString () << " to update nodes log" );
652
+
653
+ NKikimrNodeBroker::TUpdateNodes updateNodes;
654
+
655
+ switch (node.State ) {
656
+ case ENodeState::Active:
657
+ FillNodeInfo (node, *updateNodes.AddUpdates ()->MutableNode ());
658
+ break ;
659
+ case ENodeState::Expired:
660
+ updateNodes.AddUpdates ()->SetExpiredNode (node.NodeId );
661
+ break ;
662
+ case ENodeState::Removed:
663
+ updateNodes.AddUpdates ()->SetRemovedNode (node.NodeId );
664
+ break ;
665
+ }
666
+
667
+ TString delta;
668
+ Y_PROTOBUF_SUPPRESS_NODISCARD updateNodes.SerializeToString (&delta);
669
+
670
+ Y_ENSURE (UpdateNodesLogVersions.empty () || UpdateNodesLogVersions.back ().Version <= node.Version );
671
+ if (!UpdateNodesLogVersions.empty () && UpdateNodesLogVersions.back ().Version == node.Version ) {
672
+ UpdateNodesLog += delta;
673
+ UpdateNodesLogVersions.back ().CacheEndOffset = UpdateNodesLog.size ();
674
+ } else {
675
+ UpdateNodesLog += delta;
676
+ UpdateNodesLogVersions.emplace_back (node.Version , UpdateNodesLog.size ());
677
+ }
678
+ TabletCounters->Simple ()[COUNTER_UPDATE_NODES_LOG_SIZE_BYTES].Set (UpdateNodesLog.size ());
679
+ }
680
+
618
681
void TNodeBroker::SubscribeForConfigUpdates (const TActorContext &ctx)
619
682
{
620
683
ui32 nodeBrokerItem = (ui32)NKikimrConsole::TConfigItem::NodeBrokerConfigItem;
621
684
ui32 featureFlagsItem = (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem;
622
685
NConsole::SubscribeViaConfigDispatcher (ctx, {nodeBrokerItem, featureFlagsItem}, ctx.SelfID );
623
686
}
624
687
688
+ void TNodeBroker::SendToSubscriber (const TSubscriberInfo &subscriber, IEventBase* event, const TActorContext &ctx) const
689
+ {
690
+ SendToSubscriber (subscriber, event, 0 , ctx);
691
+ }
692
+
693
+ void TNodeBroker::SendToSubscriber (const TSubscriberInfo &subscriber, IEventBase* event, ui64 cookie, const TActorContext &ctx) const
694
+ {
695
+ THolder<IEventHandle> ev = MakeHolder<IEventHandle>(subscriber.Id , ctx.SelfID , event, 0 , cookie);
696
+ if (subscriber.PipeServerInfo ->IcSession ) {
697
+ ev->Rewrite (TEvInterconnect::EvForward, subscriber.PipeServerInfo ->IcSession );
698
+ }
699
+ ctx.Send (ev.Release ());
700
+ }
701
+
702
+ void TNodeBroker::SendUpdateNodes (const TActorContext &ctx)
703
+ {
704
+ if (SentVersion >= Committed.Epoch .Version ) {
705
+ return ;
706
+ }
707
+
708
+ for (const auto & [_, subscriber] : Subscribers) {
709
+ SendUpdateNodes (subscriber, SentVersion, ctx);
710
+ }
711
+ SentVersion = Committed.Epoch .Version ;
712
+ }
713
+
714
+ void TNodeBroker::SendUpdateNodes (const TSubscriberInfo &subscriber, ui64 version, const TActorContext &ctx)
715
+ {
716
+ NKikimrNodeBroker::TUpdateNodes record;
717
+ record.SetSeqNo (subscriber.SeqNo );
718
+ Committed.Epoch .Serialize (*record.MutableEpoch ());
719
+ auto response = MakeHolder<TEvNodeBroker::TEvUpdateNodes>(record);
720
+
721
+ auto it = std::lower_bound (UpdateNodesLogVersions.begin (), UpdateNodesLogVersions.end (), version + 1 );
722
+ if (it != UpdateNodesLogVersions.begin ()) {
723
+ response->PreSerializedData = UpdateNodesLog.substr (std::prev (it)->CacheEndOffset );
724
+ } else {
725
+ response->PreSerializedData = UpdateNodesLog;
726
+ }
727
+
728
+ TabletCounters->Percentile ()[COUNTER_UPDATE_NODES_BYTES].IncrementFor (response->GetCachedByteSize ());
729
+ LOG_TRACE_S (ctx, NKikimrServices::NODE_BROKER,
730
+ " Send TEvUpdateNodes v" << version << " -> v" << Committed.Epoch .Version
731
+ << " to " << subscriber.Id );
732
+ SendToSubscriber (subscriber, response.Release (), ctx);
733
+ }
734
+
735
+ TNodeBroker::TSubscriberInfo& TNodeBroker::AddSubscriber (TActorId subscriberId,
736
+ TActorId pipeServerId,
737
+ ui64 seqNo,
738
+ const TActorContext &ctx)
739
+ {
740
+ LOG_DEBUG_S (ctx, NKikimrServices::NODE_BROKER,
741
+ " New subscriber " << subscriberId
742
+ << " , seqNo: " << seqNo
743
+ << " , server pipe id: " << pipeServerId);
744
+
745
+ auto & pipeServer = PipeServers.at (pipeServerId);
746
+ auto res = Subscribers.emplace (subscriberId, TSubscriberInfo (subscriberId, seqNo, &pipeServer));
747
+ Y_ENSURE (res.second , " Subscription already exists for " << subscriberId);
748
+ pipeServer.Subscribers .insert (subscriberId);
749
+ return res.first ->second ;
750
+ }
751
+
752
+ void TNodeBroker::RemoveSubscriber (TActorId subscriber, const TActorContext &ctx)
753
+ {
754
+ auto it = Subscribers.find (subscriber);
755
+ Y_ENSURE (it != Subscribers.end (), " No subscription for " << subscriber);
756
+
757
+ LOG_DEBUG_S (ctx, NKikimrServices::NODE_BROKER,
758
+ " Unsubscribed " << subscriber
759
+ << " , seqNo: " << it->second .SeqNo
760
+ << " , server pipe id: " << it->second .PipeServerInfo ->Id );
761
+
762
+ it->second .PipeServerInfo ->Subscribers .erase (subscriber);
763
+ Subscribers.erase (it);
764
+ }
765
+
766
+ bool TNodeBroker::HasOutdatedSubscription (TActorId subscriber, ui64 newSeqNo) const
767
+ {
768
+ if (auto it = Subscribers.find (subscriber); it != Subscribers.end ()) {
769
+ return it->second .SeqNo < newSeqNo;
770
+ }
771
+ return false ;
772
+ }
773
+
625
774
void TNodeBroker::TState::LoadConfigFromProto (const NKikimrNodeBroker::TConfig &config)
626
775
{
627
776
Config = config;
@@ -1452,6 +1601,53 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvSetConfigRequest::TPtr &ev,
1452
1601
Execute (CreateTxUpdateConfig (ev), ctx);
1453
1602
}
1454
1603
1604
+ void TNodeBroker::Handle (TEvNodeBroker::TEvSubscribeNodesRequest::TPtr &ev,
1605
+ const TActorContext &ctx)
1606
+ {
1607
+ TabletCounters->Cumulative ()[COUNTER_SUBSCRIBE_NODES_REQUESTS].Increment (1 );
1608
+
1609
+ auto seqNo = ev->Get ()->Record .GetSeqNo ();
1610
+ if (HasOutdatedSubscription (ev->Sender , seqNo)) {
1611
+ RemoveSubscriber (ev->Sender , ctx);
1612
+ }
1613
+
1614
+ if (!Subscribers.contains (ev->Sender )) {
1615
+ const auto & subscriber = AddSubscriber (ev->Sender , ev->Recipient , seqNo, ctx);
1616
+ SendUpdateNodes (subscriber, ev->Get ()->Record .GetCachedVersion (), ctx);
1617
+ }
1618
+ }
1619
+
1620
+ void TNodeBroker::Handle (TEvNodeBroker::TEvSyncNodesRequest::TPtr &ev,
1621
+ const TActorContext &ctx)
1622
+ {
1623
+ TabletCounters->Cumulative ()[COUNTER_SYNC_NODES_REQUESTS].Increment (1 );
1624
+
1625
+ if (auto it = Subscribers.find (ev->Sender ); it != Subscribers.end ()) {
1626
+ if (it->second .SeqNo == ev->Get ()->Record .GetSeqNo ()) {
1627
+ auto response = MakeHolder<TEvNodeBroker::TEvSyncNodesResponse>();
1628
+ response->Record .SetSeqNo (it->second .SeqNo );
1629
+ SendToSubscriber (it->second , response.Release (), ev->Cookie , ctx);
1630
+ }
1631
+ }
1632
+ }
1633
+
1634
+ void TNodeBroker::Handle (TEvTabletPipe::TEvServerConnected::TPtr &ev)
1635
+ {
1636
+ auto res = PipeServers.emplace (ev->Get ()->ServerId , TPipeServerInfo (ev->Get ()->ServerId , ev->Get ()->InterconnectSession ));
1637
+ Y_ENSURE (res.second , " Unexpected TEvServerConnected for " << ev->Get ()->ServerId );
1638
+ }
1639
+
1640
+ void TNodeBroker::Handle (TEvTabletPipe::TEvServerDisconnected::TPtr &ev,
1641
+ const TActorContext &ctx)
1642
+ {
1643
+ auto it = PipeServers.find (ev->Get ()->ServerId );
1644
+ Y_ENSURE (it != PipeServers.end (), " Unexpected TEvServerDisconnected for " << ev->Get ()->ServerId );
1645
+ while (!it->second .Subscribers .empty ()) {
1646
+ RemoveSubscriber (*it->second .Subscribers .begin (), ctx);
1647
+ }
1648
+ PipeServers.erase (it);
1649
+ }
1650
+
1455
1651
void TNodeBroker::Handle (TEvPrivate::TEvUpdateEpoch::TPtr &ev,
1456
1652
const TActorContext &ctx)
1457
1653
{
0 commit comments