24
24
#include < util/generic/string.h>
25
25
#include < util/generic/utility.h>
26
26
#include < util/string/cast.h>
27
+ #include < util/generic/xrange.h>
27
28
28
29
namespace NKikimr {
29
30
@@ -137,7 +138,7 @@ namespace {
137
138
NKikimrSchemeBoard::TEvNotify Notify;
138
139
TPathId SubdomainPathId;
139
140
TSet<ui64> PathAbandonedTenantsSchemeShards;
140
- TMaybe<NKikimrScheme::TEvDescribeSchemeResult> DescribeSchemeResult;
141
+ TMaybe<NKikimrScheme::TEvDescribeSchemeResult> DescribeSchemeResult = Nothing() ;
141
142
142
143
static TNotifyResponse FromNotify (NKikimrSchemeBoard::TEvNotify&& record) {
143
144
// PathSubdomainPathId's absence is a marker that input message was sent
@@ -342,7 +343,6 @@ namespace {
342
343
struct TEvPrivate {
343
344
enum EEv {
344
345
EvReplicaMissing = EventSpaceBegin (TKikimrEvents::ES_PRIVATE),
345
- EvSwitchReplica,
346
346
347
347
EvEnd,
348
348
};
@@ -616,11 +616,6 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> {
616
616
};
617
617
}
618
618
619
- void HandleSwitchReplica (STATEFN_SIG) {
620
- Replica = ev->Sender ;
621
- TActivationContext::Send (new IEventHandle (TEvents::TSystem::Poison, 0 , ReplicaSubscriber, this ->SelfId (), nullptr , 0 ));
622
- }
623
-
624
619
public:
625
620
static constexpr NKikimrServices::TActivity::EType ActorActivityType () {
626
621
return NKikimrServices::TActivity::SCHEME_BOARD_SUBSCRIBER_PROXY_ACTOR;
@@ -666,8 +661,6 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> {
666
661
hFunc (TEvents::TEvGone, Handle);
667
662
hFunc (TEvPrivate::TEvReplicaMissing, Handle);
668
663
cFunc (TEvents::TEvPoisonPill::EventType, PassAway);
669
-
670
- fFunc (TEvPrivate::EvSwitchReplica, HandleSwitchReplica);
671
664
}
672
665
}
673
666
@@ -679,8 +672,6 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> {
679
672
680
673
CFunc (TEvents::TEvWakeup::EventType, Bootstrap);
681
674
cFunc (TEvents::TEvPoisonPill::EventType, PassAway);
682
-
683
- fFunc (TEvPrivate::EvSwitchReplica, HandleSwitchReplica);
684
675
}
685
676
}
686
677
@@ -778,7 +769,24 @@ class TSubscriber: public TMonitorableActor<TDerived> {
778
769
}
779
770
780
771
bool IsMajorityReached () const {
781
- return InitialResponses.size () > (Proxies.size () / 2 );
772
+ TVector<ui32> responsesByGroup (ProxyGroups.size ());
773
+ for (const auto & [proxy, _] : InitialResponses) {
774
+ if (const auto * groupIdx = ProxyToGroupMap.FindPtr (proxy)) {
775
+ responsesByGroup[*groupIdx]++;
776
+ } else {
777
+ SBS_LOG_N (" Previously received response sender is currently unknown"
778
+ << " : sender# " << proxy);
779
+ }
780
+ }
781
+ for (size_t groupIdx : xrange (ProxyGroups.size ())) {
782
+ if (ProxyGroups[groupIdx].WriteOnly ) {
783
+ continue ;
784
+ }
785
+ if (responsesByGroup[groupIdx] <= ProxyGroups[groupIdx].Proxies .size () / 2 ) {
786
+ return false ;
787
+ }
788
+ }
789
+ return true ;
782
790
}
783
791
784
792
void EnqueueSyncRequest (NInternalEvents::TEvSyncRequest::TPtr& ev) {
@@ -794,9 +802,11 @@ class TSubscriber: public TMonitorableActor<TDerived> {
794
802
DelayedSyncRequest = 0 ;
795
803
796
804
Y_ABORT_UNLESS (PendingSync.empty ());
797
- for (const auto & [proxy, replica] : Proxies) {
798
- this ->Send (proxy, new NInternalEvents::TEvSyncVersionRequest (Path), 0 , CurrentSyncRequest);
799
- PendingSync.emplace (proxy);
805
+ for (const auto & proxyGroup : ProxyGroups) {
806
+ for (const auto & [proxy, _] : proxyGroup.Proxies ) {
807
+ this ->Send (proxy, new NInternalEvents::TEvSyncVersionRequest (Path), 0 , CurrentSyncRequest);
808
+ PendingSync.emplace (proxy);
809
+ }
800
810
}
801
811
802
812
return true ;
@@ -812,6 +822,12 @@ class TSubscriber: public TMonitorableActor<TDerived> {
812
822
return ;
813
823
}
814
824
825
+ if (!ProxyToGroupMap.contains (ev->Sender )) {
826
+ SBS_LOG_E (" Unknown " << ev->Get ()->ToString ()
827
+ << " : sender# " << ev->Sender );
828
+ return ;
829
+ }
830
+
815
831
// TEvNotify message is consumed here, can't be used after this point
816
832
TNotifyResponse notifyResponse = TNotifyResponse::FromNotify (std::move (*ev->Get ()->MutableRecord ()));
817
833
TNotifyResponse* selectedNotify = ¬ifyResponse;
@@ -886,6 +902,11 @@ class TSubscriber: public TMonitorableActor<TDerived> {
886
902
Y_ABORT_UNLESS (MaybeRunVersionSync ());
887
903
}
888
904
905
+ static bool IsSyncFinished (ui32 successes, ui32 failures, ui32 expectedTotal) {
906
+ const auto half = expectedTotal;
907
+ return successes > half || failures > half || successes + failures >= expectedTotal;
908
+ }
909
+
889
910
void Handle (NInternalEvents::TEvSyncVersionResponse::TPtr& ev) {
890
911
SBS_LOG_D (" Handle " << ev->Get ()->ToString ()
891
912
<< " : sender# " << ev->Sender
@@ -907,48 +928,70 @@ class TSubscriber: public TMonitorableActor<TDerived> {
907
928
return ;
908
929
}
909
930
931
+ if (!ProxyToGroupMap.contains (ev->Sender )) {
932
+ SBS_LOG_E (" Sync sender is unknown"
933
+ << " : sender# " << ev->Sender
934
+ << " , cookie# " << ev->Cookie );
935
+ return ;
936
+ }
937
+
910
938
PendingSync.erase (it);
911
939
Y_ABORT_UNLESS (!ReceivedSync.contains (ev->Sender ));
912
940
ReceivedSync[ev->Sender ] = ev->Get ()->Record .GetPartial ();
913
941
914
- ui32 successes = 0 ;
915
- ui32 failures = 0 ;
916
- for (const auto & [_, partial] : ReceivedSync) {
942
+ TVector<ui32> successesByGroup (ProxyGroups.size ());
943
+ TVector<ui32> failuresByGroup (ProxyGroups.size ());
944
+ for (const auto & [proxy, partial] : ReceivedSync) {
945
+ const auto * groupIdx = ProxyToGroupMap.FindPtr (proxy);
946
+ if (!groupIdx) {
947
+ SBS_LOG_N (" Previously received sync sender is currently unknown"
948
+ << " : sender# " << proxy);
949
+ continue ;
950
+ }
917
951
if (!partial) {
918
- ++successes ;
952
+ ++successesByGroup[*groupIdx] ;
919
953
} else {
920
- ++failures ;
954
+ ++failuresByGroup[*groupIdx] ;
921
955
}
922
956
}
923
-
924
- const ui32 size = Proxies.size ();
925
- const ui32 half = size / 2 ;
926
- if (successes <= half && failures <= half && (successes + failures) < size) {
927
- SBS_LOG_D (" Sync is in progress"
957
+ bool syncIsComplete = true ;
958
+ for (size_t groupIdx : xrange (ProxyGroups.size ())) {
959
+ if (ProxyGroups[groupIdx].WriteOnly ) {
960
+ continue ;
961
+ }
962
+ const ui32 size = ProxyGroups[groupIdx].Proxies .size ();
963
+ const ui32 half = size / 2 ;
964
+ if (!IsSyncFinished (successesByGroup[groupIdx], failuresByGroup[groupIdx], size)) {
965
+ SBS_LOG_D (" Sync is in progress"
966
+ << " : cookie# " << ev->Cookie
967
+ << " , ring group# " << groupIdx
968
+ << " , size# " << size
969
+ << " , half# " << half
970
+ << " , successes# " << successesByGroup[groupIdx]
971
+ << " , failures# " << failuresByGroup[groupIdx]);
972
+ return ;
973
+ }
974
+ syncIsComplete &= successesByGroup[groupIdx] > half;
975
+ const auto finalMessage = TStringBuilder () << " Sync is done in the ring group"
928
976
<< " : cookie# " << ev->Cookie
977
+ << " , ring group# " << groupIdx
929
978
<< " , size# " << size
930
979
<< " , half# " << half
931
- << " , successes# " << successes
932
- << " , faulires# " << failures);
933
- return ;
980
+ << " , successes# " << successesByGroup[groupIdx]
981
+ << " , failures# " << failuresByGroup[groupIdx]
982
+ << " , partial# " << !syncIsComplete;
983
+ if (syncIsComplete) {
984
+ SBS_LOG_D (finalMessage);
985
+ } else {
986
+ SBS_LOG_N (finalMessage);
987
+ }
934
988
}
935
-
936
- const bool partial = !(successes > half);
937
- const TString done = TStringBuilder () << " Sync is done"
938
- << " : cookie# " << ev->Cookie
939
- << " , size# " << size
940
- << " , half# " << half
941
- << " , successes# " << successes
942
- << " , faulires# " << failures
943
- << " , partial# " << partial;
944
-
945
- if (!partial) {
946
- SBS_LOG_D (done);
947
- } else {
948
- SBS_LOG_W (done);
989
+ if (!syncIsComplete) {
990
+ SBS_LOG_W (" Sync is incomplete in one of the ring groups"
991
+ << " : cookie# " << ev->Cookie );
949
992
}
950
993
951
- this ->Send (Owner, new NInternalEvents::TEvSyncResponse (Path, partial ), 0 , ev->Cookie );
994
+ this ->Send (Owner, new NInternalEvents::TEvSyncResponse (Path, !syncIsComplete ), 0 , ev->Cookie );
952
995
953
996
PendingSync.clear ();
954
997
ReceivedSync.clear ();
@@ -959,28 +1002,50 @@ class TSubscriber: public TMonitorableActor<TDerived> {
959
1002
void Handle (TEvStateStorage::TEvResolveReplicasList::TPtr& ev) {
960
1003
SBS_LOG_D (" Handle " << ev->Get ()->ToString ());
961
1004
962
- const auto & replicas = ev->Get ()->GetPlainReplicas () ;
1005
+ const auto & replicaGroups = ev->Get ()->ReplicaGroups ;
963
1006
964
- if (replicas .empty ()) {
965
- Y_ABORT_UNLESS (Proxies .empty ());
1007
+ if (replicaGroups .empty ()) {
1008
+ Y_ABORT_UNLESS (ProxyGroups .empty ());
966
1009
SBS_LOG_E (" Subscribe on unconfigured SchemeBoard" );
967
1010
this ->Become (&TDerived::StateCalm);
968
1011
return ;
969
1012
}
970
1013
971
- Y_ABORT_UNLESS (Proxies.empty () || Proxies.size () == replicas.size ());
1014
+ for (const auto & group : ProxyGroups) {
1015
+ for (const auto & [proxy, _] : group.Proxies ) {
1016
+ this ->Send (proxy, new TEvents::TEvPoisonPill ());
1017
+ }
1018
+ }
1019
+ ProxyToGroupMap.clear ();
1020
+ ProxyGroups.clear ();
1021
+ States.clear ();
1022
+ InitialResponses.clear ();
1023
+ State.Clear ();
1024
+ PendingSync.clear ();
1025
+ ReceivedSync.clear ();
972
1026
973
- if (Proxies. empty () ) {
974
- for ( size_t i = 0 ; i < replicas. size (); ++i) {
975
- Proxies. emplace_back ( this -> RegisterWithSameMailbox ( new TProxyDerived ( this -> SelfId (), i, replicas. size (),
976
- replicas[i], Path, DomainOwnerId)), replicas[i]) ;
1027
+ for ( size_t groupIdx = 0 ; groupIdx < replicaGroups. size (); ++groupIdx ) {
1028
+ const auto & replicaGroup = replicaGroups[groupIdx];
1029
+ if (replicaGroup. WriteOnly ) {
1030
+ continue ;
977
1031
}
978
- } else {
979
- for (size_t i = 0 ; i < replicas.size (); ++i) {
980
- if (auto & [proxy, replica] = Proxies[i]; replica != replicas[i]) {
981
- TActivationContext::Send (new IEventHandle (TEvPrivate::EvSwitchReplica, 0 , proxy, replicas[i], nullptr , 0 ));
982
- replica = replicas[i];
983
- }
1032
+ auto & proxyGroup = ProxyGroups.emplace_back ();
1033
+
1034
+ proxyGroup.Proxies .reserve (replicaGroup.Replicas .size ());
1035
+ for (size_t i = 0 ; i < replicaGroup.Replicas .size (); ++i) {
1036
+ auto & proxy = proxyGroup.Proxies .emplace_back ();
1037
+ proxy.Replica = replicaGroup.Replicas [i];
1038
+ proxy.Proxy = this ->RegisterWithSameMailbox (
1039
+ new TProxyDerived (
1040
+ this ->SelfId (),
1041
+ i,
1042
+ replicaGroup.Replicas .size (),
1043
+ replicaGroup.Replicas [i],
1044
+ Path,
1045
+ DomainOwnerId
1046
+ )
1047
+ );
1048
+ ProxyToGroupMap[proxy.Proxy ] = ProxyGroups.size () - 1 ;
984
1049
}
985
1050
}
986
1051
@@ -1040,10 +1105,11 @@ class TSubscriber: public TMonitorableActor<TDerived> {
1040
1105
}
1041
1106
1042
1107
void PassAway () override {
1043
- for (const auto & [proxy, replica] : Proxies) {
1044
- this ->Send (proxy, new TEvents::TEvPoisonPill ());
1108
+ for (const auto & group : ProxyGroups) {
1109
+ for (const auto & [proxy, _] : group.Proxies ) {
1110
+ this ->Send (proxy, new TEvents::TEvPoisonPill ());
1111
+ }
1045
1112
}
1046
-
1047
1113
TActivationContext::Send (new IEventHandle (TEvents::TSystem::Unsubscribe, 0 , MakeStateStorageProxyID (),
1048
1114
this ->SelfId (), nullptr , 0 ));
1049
1115
@@ -1128,7 +1194,18 @@ class TSubscriber: public TMonitorableActor<TDerived> {
1128
1194
const TPath Path;
1129
1195
const ui64 DomainOwnerId;
1130
1196
1131
- std::vector<std::tuple<TActorId, TActorId>> Proxies;
1197
+ struct TProxyInfo {
1198
+ TActorId Proxy;
1199
+ TActorId Replica;
1200
+ };
1201
+
1202
+ struct TProxyGroup {
1203
+ bool WriteOnly;
1204
+ TVector<TProxyInfo> Proxies;
1205
+ };
1206
+
1207
+ THashMap<TActorId, ui32> ProxyToGroupMap;
1208
+ TVector<TProxyGroup> ProxyGroups;
1132
1209
TMap<TActorId, TState> States;
1133
1210
TMap<TActorId, TNotifyResponse> InitialResponses;
1134
1211
TMaybe<TState> State;
0 commit comments