@@ -206,7 +206,7 @@ TInstant THive::GetAllowedBootingTime() {
206
206
return result;
207
207
}
208
208
209
- void THive::ExecuteProcessBootQueue (NIceDb::TNiceDb& db , TSideEffects& sideEffects) {
209
+ void THive::ExecuteProcessBootQueue (NIceDb::TNiceDb&, TSideEffects& sideEffects) {
210
210
TInstant now = TActivationContext::Now ();
211
211
if (WarmUp) {
212
212
TInstant allowed = GetAllowedBootingTime ();
@@ -257,10 +257,6 @@ void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb& db, TSideEffects& sideEffec
257
257
sideEffects.Send (actorToNotify, new TEvPrivate::TEvRestartComplete (tablet->GetFullTabletId (), " boot delay" ));
258
258
}
259
259
tablet->ActorsToNotifyOnRestart .clear ();
260
- if (tablet->IsFollower ()) {
261
- TLeaderTabletInfo& leader = tablet->GetLeader ();
262
- UpdateTabletFollowersNumber (leader, db, sideEffects);
263
- }
264
260
BootQueue.AddToWaitQueue (record); // waiting for new node
265
261
continue ;
266
262
}
@@ -786,16 +782,11 @@ void THive::Handle(TEvInterconnect::TEvNodeInfo::TPtr &ev) {
786
782
}
787
783
788
784
void THive::Handle (TEvInterconnect::TEvNodesInfo::TPtr &ev) {
789
- THashSet<TDataCenterId> dataCenters;
790
785
for (const TEvInterconnect::TNodeInfo& node : ev->Get ()->Nodes ) {
791
786
NodesInfo[node.NodeId ] = node;
792
- dataCenters.insert (node.Location .GetDataCenterId ());
793
- }
794
- dataCenters.erase (0 ); // remove default data center id if exists
795
- if (!dataCenters.empty ()) {
796
- if (DataCenters != dataCenters.size ()) {
797
- DataCenters = dataCenters.size ();
798
- BLOG_D (" TEvInterconnect::TEvNodesInfo DataCenters=" << DataCenters << " RegisteredDataCenters=" << RegisteredDataCenters);
787
+ auto dataCenterId = node.Location .GetDataCenterId ();
788
+ if (dataCenterId != 0 ) {
789
+ DataCenters[dataCenterId]; // just create entry in hash map
799
790
}
800
791
}
801
792
Execute (CreateLoadEverything ());
@@ -2737,81 +2728,97 @@ void THive::SendReconnect(const TActorId& local) {
2737
2728
}
2738
2729
2739
2730
ui32 THive::GetDataCenters () {
2740
- return DataCenters ? DataCenters : 1 ;
2741
- }
2742
-
2743
- ui32 THive::GetRegisteredDataCenters () {
2744
- return RegisteredDataCenters ? RegisteredDataCenters : 1 ;
2745
- }
2746
-
2747
- void THive::UpdateRegisteredDataCenters () {
2748
- if (RegisteredDataCenters != RegisteredDataCenterNodes.size ()) {
2749
- BLOG_D (" THive (UpdateRegisteredDC) DataCenters=" << DataCenters << " RegisteredDataCenters=" << RegisteredDataCenters << " ->" << RegisteredDataCenterNodes.size ());
2750
- RegisteredDataCenters = RegisteredDataCenterNodes.size ();
2751
- }
2731
+ return DataCenters.size () ? DataCenters.size () : 1 ;
2752
2732
}
2753
2733
2754
2734
void THive::AddRegisteredDataCentersNode (TDataCenterId dataCenterId, TNodeId nodeId) {
2735
+ BLOG_D (" AddRegisteredDataCentersNode(" << dataCenterId << " , " << nodeId << " )" );
2755
2736
if (dataCenterId != 0 ) { // ignore default data center id if exists
2756
- if (RegisteredDataCenterNodes[dataCenterId].insert (nodeId).second ) {
2757
- if (RegisteredDataCenters != RegisteredDataCenterNodes.size ()) {
2758
- UpdateRegisteredDataCenters ();
2759
- }
2737
+ auto & dataCenter = DataCenters[dataCenterId];
2738
+ bool wasRegistered = dataCenter.IsRegistered ();
2739
+ dataCenter.RegisteredNodes .insert (nodeId);
2740
+ if (!wasRegistered && !dataCenter.UpdateScheduled ) {
2741
+ dataCenter.UpdateScheduled = true ;
2742
+ Schedule (TDuration::Seconds (1 ), new TEvPrivate::TEvUpdateDataCenterFollowers (dataCenterId));
2760
2743
}
2761
2744
}
2762
2745
}
2763
2746
2764
2747
void THive::RemoveRegisteredDataCentersNode (TDataCenterId dataCenterId, TNodeId nodeId) {
2748
+ BLOG_D (" RemoveRegisteredDataCentersNode(" << dataCenterId << " , " << nodeId << " )" );
2765
2749
if (dataCenterId != 0 ) { // ignore default data center id if exists
2766
- RegisteredDataCenterNodes [dataCenterId]. erase (nodeId) ;
2767
- if (RegisteredDataCenterNodes[dataCenterId]. size () == 0 ) {
2768
- RegisteredDataCenterNodes. erase (dataCenterId );
2769
- }
2770
- if (RegisteredDataCenters != RegisteredDataCenterNodes. size ()) {
2771
- UpdateRegisteredDataCenters ( );
2750
+ auto & dataCenter = DataCenters [dataCenterId];
2751
+ bool wasRegistered = dataCenter. IsRegistered ();
2752
+ dataCenter. RegisteredNodes . erase (nodeId );
2753
+ if (wasRegistered && !dataCenter. IsRegistered () && !dataCenter. UpdateScheduled ) {
2754
+ dataCenter. UpdateScheduled = true ;
2755
+ Schedule ( TDuration::Seconds ( 1 ), new TEvPrivate::TEvUpdateDataCenterFollowers (dataCenterId) );
2772
2756
}
2773
2757
}
2774
2758
}
2775
2759
2776
- void THive::UpdateTabletFollowersNumber (TLeaderTabletInfo& tablet, NIceDb::TNiceDb& db, TSideEffects& sideEffects) {
2777
- BLOG_D (" UpdateTabletFollowersNumber Tablet " << tablet.ToString () << " RegisteredDataCenters=" << GetRegisteredDataCenters ());
2778
- for (TFollowerGroup& group : tablet.FollowerGroups ) {
2779
- ui32 followerCount = tablet.GetActualFollowerCount (group.Id );
2780
- ui32 requiredFollowerCount = group.GetComputedFollowerCount (GetRegisteredDataCenters ());
2781
-
2782
- while (followerCount < requiredFollowerCount) {
2783
- BLOG_D (" UpdateTabletFollowersNumber Tablet " << tablet.ToString () << " is increasing number of followers (" << followerCount << " <" << requiredFollowerCount << " )" );
2784
-
2785
- TFollowerTabletInfo& follower = tablet.AddFollower (group);
2786
- follower.Statistics .SetLastAliveTimestamp (TlsActivationContext->Now ().MilliSeconds ());
2787
- db.Table <Schema::TabletFollowerTablet>().Key (tablet.Id , follower.Id ).Update (
2788
- NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup .Id ),
2789
- NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0 ),
2790
- NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics ));
2791
- follower.InitTabletMetrics ();
2792
- follower.BecomeStopped ();
2793
- ++followerCount;
2794
- }
2760
+ void THive::CreateTabletFollowers (TLeaderTabletInfo& tablet, NIceDb::TNiceDb& db, TSideEffects& sideEffects) {
2761
+ BLOG_D (" CreateTabletFollowers Tablet " << tablet.ToString ());
2795
2762
2796
- while (followerCount > requiredFollowerCount) {
2797
- BLOG_D (" UpdateTabletFollowersNumber Tablet " << tablet.ToString () << " is decreasing number of followers (" << followerCount << " >" << requiredFollowerCount << " )" );
2763
+ // In case tablet already has followers (happens if tablet is modified through CreateTablet), delete them
2764
+ // But create new ones before deleting old ones, to avoid issues with reusing ids
2765
+ decltype (tablet.Followers )::iterator oldFollowersIt;
2766
+ if (tablet.Followers .empty ()) {
2767
+ oldFollowersIt = tablet.Followers .end ();
2768
+ } else {
2769
+ oldFollowersIt = std::prev (tablet.Followers .end ());
2770
+ }
2798
2771
2799
- auto itFollower = tablet.Followers .rbegin ();
2800
- while (itFollower != tablet.Followers .rend () && itFollower->FollowerGroup .Id != group.Id ) {
2801
- ++itFollower;
2772
+ for (TFollowerGroup& group : tablet.FollowerGroups ) {
2773
+ if (group.FollowerCountPerDataCenter ) {
2774
+ for (auto & [dataCenterId, dataCenter] : DataCenters) {
2775
+ if (!dataCenter.IsRegistered ()) {
2776
+ continue ;
2777
+ }
2778
+ for (ui32 i = 0 ; i < group.GetFollowerCountForDataCenter (dataCenterId); ++i) {
2779
+ TFollowerTabletInfo& follower = tablet.AddFollower (group);
2780
+ follower.NodeFilter .AllowedDataCenters = {dataCenterId};
2781
+ follower.Statistics .SetLastAliveTimestamp (TlsActivationContext->Now ().MilliSeconds ());
2782
+ db.Table <Schema::TabletFollowerTablet>().Key (tablet.Id , follower.Id ).Update (
2783
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup .Id ),
2784
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0 ),
2785
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics ),
2786
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::DataCenter>(dataCenterId));
2787
+ follower.InitTabletMetrics ();
2788
+ follower.BecomeStopped ();
2789
+ dataCenter.Followers [{tablet.Id , group.Id }].push_back (std::prev (tablet.Followers .end ()));
2790
+ BLOG_D (" Created follower " << follower.GetFullTabletId () << " for dc " << dataCenterId);
2791
+ }
2802
2792
}
2803
- if (itFollower == tablet.Followers .rend ()) {
2804
- break ;
2793
+ } else {
2794
+ for (ui32 i = 0 ; i < group.GetRawFollowerCount (); ++i) {
2795
+ TFollowerTabletInfo& follower = tablet.AddFollower (group);
2796
+ follower.Statistics .SetLastAliveTimestamp (TlsActivationContext->Now ().MilliSeconds ());
2797
+ db.Table <Schema::TabletFollowerTablet>().Key (tablet.Id , follower.Id ).Update (
2798
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup .Id ),
2799
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0 ),
2800
+ NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics ));
2801
+ follower.InitTabletMetrics ();
2802
+ follower.BecomeStopped ();
2803
+ BLOG_D (" Created follower " << follower.GetFullTabletId ());
2805
2804
}
2806
- TFollowerTabletInfo& follower = *itFollower;
2807
- db.Table <Schema::TabletFollowerTablet>().Key (tablet.Id , follower.Id ).Delete ();
2808
- db.Table <Schema::Metrics>().Key (tablet.Id , follower.Id ).Delete ();
2809
- follower.InitiateStop (sideEffects);
2810
- tablet.Followers .erase (std::prev (itFollower.base ()));
2811
- UpdateCounterTabletsTotal (-1 );
2812
- --followerCount;
2805
+
2813
2806
}
2814
2807
}
2808
+
2809
+ if (oldFollowersIt == tablet.Followers .end ()) {
2810
+ return ;
2811
+ }
2812
+ auto endIt = std::next (oldFollowersIt);
2813
+ for (auto followerIt = tablet.Followers .begin (); followerIt != endIt; ++followerIt) {
2814
+ TFollowerTabletInfo& follower = *followerIt;
2815
+ BLOG_D (" Deleting follower " << follower.GetFullTabletId ());
2816
+ db.Table <Schema::TabletFollowerTablet>().Key (tablet.Id , follower.Id ).Delete ();
2817
+ db.Table <Schema::Metrics>().Key (tablet.Id , follower.Id ).Delete ();
2818
+ follower.InitiateStop (sideEffects);
2819
+ UpdateCounterTabletsTotal (-1 );
2820
+ }
2821
+ tablet.Followers .erase (tablet.Followers .begin (), endIt);
2815
2822
}
2816
2823
2817
2824
TDuration THive::GetBalancerCooldown (EBalancerType balancerType) const {
@@ -3036,6 +3043,7 @@ void THive::ProcessEvent(std::unique_ptr<IEventHandle> event) {
3036
3043
hFunc (TEvHive::TEvUpdateDomain, Handle);
3037
3044
hFunc (TEvPrivate::TEvDeleteNode, Handle);
3038
3045
hFunc (TEvHive::TEvRequestTabletDistribution, Handle);
3046
+ hFunc (TEvPrivate::TEvUpdateDataCenterFollowers, Handle);
3039
3047
}
3040
3048
}
3041
3049
@@ -3138,6 +3146,7 @@ STFUNC(THive::StateWork) {
3138
3146
fFunc (TEvPrivate::TEvProcessStorageBalancer::EventType, EnqueueIncomingEvent);
3139
3147
fFunc (TEvPrivate::TEvDeleteNode::EventType, EnqueueIncomingEvent);
3140
3148
fFunc (TEvHive::TEvRequestTabletDistribution::EventType, EnqueueIncomingEvent);
3149
+ fFunc (TEvPrivate::TEvUpdateDataCenterFollowers::EventType, EnqueueIncomingEvent);
3141
3150
hFunc (TEvPrivate::TEvProcessIncomingEvent, Handle);
3142
3151
default :
3143
3152
if (!HandleDefaultEvents (ev, SelfId ())) {
@@ -3435,6 +3444,10 @@ void THive::Handle(TEvHive::TEvRequestTabletDistribution::TPtr& ev) {
3435
3444
Send (ev->Sender , response.release ());
3436
3445
}
3437
3446
3447
+ void THive::Handle (TEvPrivate::TEvUpdateDataCenterFollowers::TPtr& ev) {
3448
+ Execute (CreateUpdateDcFollowers (ev->Get ()->DataCenter ));
3449
+ }
3450
+
3438
3451
TVector<TNodeId> THive::GetNodesForWhiteboardBroadcast (size_t maxNodesToReturn) {
3439
3452
TVector<TNodeId> nodes;
3440
3453
TNodeId selfNodeId = SelfId ().NodeId ();
0 commit comments