Skip to content

Commit 82d780f

Browse files
authored
Bridge Mode in SB: gracefully continue sync request execution in case of concurrent reconfiguration (#20712)
1 parent 597e16a commit 82d780f

File tree

4 files changed

+110
-0
lines changed

4 files changed

+110
-0
lines changed

ydb/core/tx/scheme_board/subscriber.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,13 @@ class TSubscriber: public TMonitorableActor<TDerived> {
10401040
this->Send(proxy, new TEvents::TEvPoisonPill());
10411041
}
10421042
}
1043+
1044+
if (CurrentSyncRequest) {
1045+
SBS_LOG_I("Delay current sync request: " << CurrentSyncRequest);
1046+
DelayedSyncRequest = Max(DelayedSyncRequest, CurrentSyncRequest);
1047+
CurrentSyncRequest = 0;
1048+
}
1049+
10431050
ProxyToGroupMap.clear();
10441051
ProxyGroups.clear();
10451052
States.clear();

ydb/core/tx/scheme_board/subscriber_ut.cpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,97 @@ Y_UNIT_TEST_SUITE(TSubscriberSyncQuorumTest) {
850850
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Partial, true, syncResponse->ToString());
851851
}
852852
}
853+
854+
Y_UNIT_TEST(ReconfigurationWithDelayedSyncRequest) {
855+
TTestBasicRuntime runtime;
856+
SetupMinimalRuntime(runtime);
857+
858+
runtime.SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
859+
860+
const auto stateStorageInfo = GetStateStorageInfo(runtime);
861+
const auto participatingReplicas = CountParticipatingReplicas(*stateStorageInfo);
862+
863+
constexpr int DomainId = 1;
864+
constexpr const char* Path = "TestPath";
865+
const TActorId edge = runtime.AllocateEdgeActor();
866+
867+
const TActorId subscriber = runtime.Register(CreateSchemeBoardSubscriber(edge, Path, DomainId));
868+
TBlockEvents<NInternalEvents::TEvNotify> notificationBlocker(runtime, [&](const NInternalEvents::TEvNotify::TPtr& ev) {
869+
return ev->Recipient == subscriber;
870+
});
871+
runtime.WaitFor("initial path lookups", [&]() {
872+
return notificationBlocker.size() == participatingReplicas;
873+
}, TDuration::Seconds(10));
874+
875+
// Send sync request: subscriber will queue it in DelayedSyncRequest since it cannot process syncs before finishing its initialization.
876+
constexpr ui64 cookie = 12345;
877+
runtime.Send(new IEventHandle(subscriber, edge, new NInternalEvents::TEvSyncRequest(), 0, cookie));
878+
879+
auto replicas = ResolveReplicas(runtime, Path);
880+
runtime.Send(subscriber, edge, replicas->Release().Release());
881+
882+
// Now allow all notifications through so that initialization completes.
883+
notificationBlocker.Stop().Unblock();
884+
885+
auto syncResponse = runtime.GrabEdgeEvent<NInternalEvents::TEvSyncResponse>(edge, TDuration::Seconds(10));
886+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Path, Path, syncResponse->ToString());
887+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Cookie, cookie, syncResponse->ToString());
888+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Partial, false, syncResponse->ToString());
889+
890+
// No additional sync responses.
891+
UNIT_CHECK_GENERATED_EXCEPTION(
892+
runtime.GrabEdgeEvent<NInternalEvents::TEvSyncResponse>(edge, TDuration::Seconds(10)),
893+
TEmptyEventQueueException
894+
);
895+
}
896+
897+
Y_UNIT_TEST(ReconfigurationWithCurrentSyncRequest) {
898+
TTestBasicRuntime runtime;
899+
SetupMinimalRuntime(runtime);
900+
901+
runtime.SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
902+
903+
const auto stateStorageInfo = GetStateStorageInfo(runtime);
904+
const auto participatingReplicas = CountParticipatingReplicas(*stateStorageInfo);
905+
906+
constexpr int DomainId = 1;
907+
constexpr const char* Path = "TestPath";
908+
const TActorId edge = runtime.AllocateEdgeActor();
909+
910+
const TActorId subscriber = runtime.Register(CreateSchemeBoardSubscriber(edge, Path, DomainId));
911+
TBlockEvents<NInternalEvents::TEvNotify> notificationBlocker(runtime, [&](const NInternalEvents::TEvNotify::TPtr& ev) {
912+
return ev->Recipient == subscriber;
913+
});
914+
runtime.WaitFor("initial path lookups", [&]() {
915+
return notificationBlocker.size() == participatingReplicas;
916+
}, TDuration::Seconds(10));
917+
notificationBlocker.Stop().Unblock();
918+
919+
constexpr ui64 cookie = 12345;
920+
TBlockEvents<NInternalEvents::TEvSyncVersionResponse> syncResponseBlocker(runtime, [&](const NInternalEvents::TEvSyncVersionResponse::TPtr& ev) {
921+
return ev->Recipient == subscriber && ev->Cookie == cookie;
922+
});
923+
runtime.Send(new IEventHandle(subscriber, edge, new NInternalEvents::TEvSyncRequest(), 0, cookie));
924+
runtime.WaitFor("some sync responses", [&]() {
925+
return !syncResponseBlocker.empty();
926+
}, TDuration::Seconds(10));
927+
syncResponseBlocker.Unblock(1);
928+
929+
auto replicas = ResolveReplicas(runtime, Path);
930+
runtime.Send(subscriber, edge, replicas->Release().Release());
931+
syncResponseBlocker.Stop().Unblock();
932+
933+
auto syncResponse = runtime.GrabEdgeEvent<NInternalEvents::TEvSyncResponse>(edge, TDuration::Seconds(10));
934+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Path, Path, syncResponse->ToString());
935+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Cookie, cookie, syncResponse->ToString());
936+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Partial, false, syncResponse->ToString());
937+
938+
// No additional sync responses.
939+
UNIT_CHECK_GENERATED_EXCEPTION(
940+
runtime.GrabEdgeEvent<NInternalEvents::TEvSyncResponse>(edge, TDuration::Seconds(10)),
941+
TEmptyEventQueueException
942+
);
943+
}
853944
}
854945

855946
} // NSchemeBoard

ydb/core/tx/scheme_board/ut_helpers.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ TIntrusiveConstPtr<TStateStorageInfo> GetStateStorageInfo(TTestActorRuntime& run
2424
return result->Get()->Info;
2525
}
2626

27+
TEvStateStorage::TEvResolveReplicasList::TPtr ResolveReplicas(TTestActorRuntime& runtime, const TString& path) {
28+
const TActorId recipient = MakeStateStorageProxyID();
29+
const TActorId edge = runtime.AllocateEdgeActor();
30+
runtime.Send(recipient, edge, new TEvStateStorage::TEvResolveSchemeBoard(path));
31+
32+
auto result = runtime.GrabEdgeEvent<TEvStateStorage::TEvResolveReplicasList>(edge);
33+
UNIT_ASSERT(result);
34+
return result;
35+
}
36+
2737
NKikimrScheme::TEvDescribeSchemeResult GenerateDescribe(
2838
const TString& path,
2939
TPathId pathId,

ydb/core/tx/scheme_board/ut_helpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "events_internal.h"
66
#include "subscriber.h"
77

8+
#include <ydb/core/base/statestorage_impl.h>
89
#include <ydb/core/base/tablet_types.h>
910
#include <ydb/core/protos/flat_tx_scheme.pb.h>
1011
#include <ydb/core/testlib/basics/appdata.h>
@@ -27,6 +28,7 @@ namespace NSchemeBoard {
2728
void SetupMinimalRuntime(TTestActorRuntime& runtime, const TStateStorageSetupper& setupStateStorage = CreateDefaultStateStorageSetupper());
2829

2930
TIntrusiveConstPtr<TStateStorageInfo> GetStateStorageInfo(TTestActorRuntime& runtime);
31+
TEvStateStorage::TEvResolveReplicasList::TPtr ResolveReplicas(TTestActorRuntime& runtime, const TString& path);
3032

3133
class TTestContext: public TTestBasicRuntime {
3234
public:

0 commit comments

Comments
 (0)