Skip to content

Commit 63af5a5

Browse files
committed
fail sync requests on concurrent reconfiguration
1 parent a4d04c9 commit 63af5a5

File tree

4 files changed

+108
-0
lines changed

4 files changed

+108
-0
lines changed

ydb/core/tx/scheme_board/subscriber.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,10 @@ class TSubscriber: public TMonitorableActor<TDerived> {
10401040
this->Send(proxy, new TEvents::TEvPoisonPill());
10411041
}
10421042
}
1043+
if (CurrentSyncRequest) {
1044+
this->Send(Owner, new NInternalEvents::TEvSyncResponse(Path, true), 0, CurrentSyncRequest);
1045+
CurrentSyncRequest = 0;
1046+
}
10431047
ProxyToGroupMap.clear();
10441048
ProxyGroups.clear();
10451049
States.clear();
@@ -1076,6 +1080,7 @@ class TSubscriber: public TMonitorableActor<TDerived> {
10761080
ClusterState.Guid = ev->Get()->ClusterStateGuid;
10771081

10781082
this->Become(&TDerived::StateWork);
1083+
MaybeRunVersionSync();
10791084
}
10801085

10811086
void Handle(TSchemeBoardMonEvents::TEvInfoRequest::TPtr& ev) {

ydb/core/tx/scheme_board/subscriber_ut.cpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,97 @@ Y_UNIT_TEST_SUITE(TSubscriberSyncQuorumTest) {
850850
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Partial, true, syncResponse->ToString());
851851
}
852852
}
853+
854+
Y_UNIT_TEST(ReconfigurationWithDelayedSyncRequest) {
855+
TTestBasicRuntime runtime;
856+
SetupMinimalRuntime(runtime);
857+
858+
runtime.SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
859+
860+
const auto stateStorageInfo = GetStateStorageInfo(runtime);
861+
const auto participatingReplicas = CountParticipatingReplicas(*stateStorageInfo);
862+
863+
constexpr int DomainId = 1;
864+
constexpr const char* Path = "TestPath";
865+
const TActorId edge = runtime.AllocateEdgeActor();
866+
867+
const TActorId subscriber = runtime.Register(CreateSchemeBoardSubscriber(edge, Path, DomainId));
868+
TBlockEvents<NInternalEvents::TEvNotify> notificationBlocker(runtime, [&](const NInternalEvents::TEvNotify::TPtr& ev) {
869+
return ev->Recipient == subscriber;
870+
});
871+
runtime.WaitFor("initial path lookups", [&]() {
872+
return notificationBlocker.size() == participatingReplicas;
873+
}, TDuration::Seconds(10));
874+
875+
// Send sync request: subscriber will queue it in DelayedSyncRequest since it cannot process syncs before finishing its initialization.
876+
constexpr ui64 cookie = 12345;
877+
runtime.Send(new IEventHandle(subscriber, edge, new NInternalEvents::TEvSyncRequest(), 0, cookie));
878+
879+
auto replicas = ResolveReplicas(runtime, Path);
880+
runtime.Send(subscriber, edge, replicas->Release().Release());
881+
882+
// Now allow all notifications through so that initialization completes.
883+
notificationBlocker.Stop().Unblock();
884+
885+
auto syncResponse = runtime.GrabEdgeEvent<NInternalEvents::TEvSyncResponse>(edge, TDuration::Seconds(10));
886+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Path, Path, syncResponse->ToString());
887+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Cookie, cookie, syncResponse->ToString());
888+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Partial, false, syncResponse->ToString());
889+
890+
// No additional sync responses.
891+
UNIT_CHECK_GENERATED_EXCEPTION(
892+
runtime.GrabEdgeEvent<NInternalEvents::TEvSyncResponse>(edge, TDuration::Seconds(10)),
893+
TEmptyEventQueueException
894+
);
895+
}
896+
897+
Y_UNIT_TEST(ReconfigurationWithCurrentSyncRequest) {
898+
TTestBasicRuntime runtime;
899+
SetupMinimalRuntime(runtime);
900+
901+
runtime.SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
902+
903+
const auto stateStorageInfo = GetStateStorageInfo(runtime);
904+
const auto participatingReplicas = CountParticipatingReplicas(*stateStorageInfo);
905+
906+
constexpr int DomainId = 1;
907+
constexpr const char* Path = "TestPath";
908+
const TActorId edge = runtime.AllocateEdgeActor();
909+
910+
const TActorId subscriber = runtime.Register(CreateSchemeBoardSubscriber(edge, Path, DomainId));
911+
TBlockEvents<NInternalEvents::TEvNotify> notificationBlocker(runtime, [&](const NInternalEvents::TEvNotify::TPtr& ev) {
912+
return ev->Recipient == subscriber;
913+
});
914+
runtime.WaitFor("initial path lookups", [&]() {
915+
return notificationBlocker.size() == participatingReplicas;
916+
}, TDuration::Seconds(10));
917+
notificationBlocker.Stop().Unblock();
918+
919+
constexpr ui64 cookie = 12345;
920+
TBlockEvents<NInternalEvents::TEvSyncVersionResponse> syncResponseBlocker(runtime, [&](const NInternalEvents::TEvSyncVersionResponse::TPtr& ev) {
921+
return ev->Recipient == subscriber && ev->Cookie == cookie;
922+
});
923+
runtime.Send(new IEventHandle(subscriber, edge, new NInternalEvents::TEvSyncRequest(), 0, cookie));
924+
runtime.WaitFor("some sync responses", [&]() {
925+
return !syncResponseBlocker.empty();
926+
}, TDuration::Seconds(10));
927+
syncResponseBlocker.Unblock(1);
928+
929+
auto replicas = ResolveReplicas(runtime, Path);
930+
runtime.Send(subscriber, edge, replicas->Release().Release());
931+
syncResponseBlocker.Stop().Unblock();
932+
933+
auto syncResponse = runtime.GrabEdgeEvent<NInternalEvents::TEvSyncResponse>(edge, TDuration::Seconds(10));
934+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Path, Path, syncResponse->ToString());
935+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Cookie, cookie, syncResponse->ToString());
936+
UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Partial, true, syncResponse->ToString());
937+
938+
// No additional sync responses.
939+
UNIT_CHECK_GENERATED_EXCEPTION(
940+
runtime.GrabEdgeEvent<NInternalEvents::TEvSyncResponse>(edge, TDuration::Seconds(10)),
941+
TEmptyEventQueueException
942+
);
943+
}
853944
}
854945

855946
} // NSchemeBoard

ydb/core/tx/scheme_board/ut_helpers.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ TIntrusiveConstPtr<TStateStorageInfo> GetStateStorageInfo(TTestActorRuntime& run
2424
return result->Get()->Info;
2525
}
2626

27+
TEvStateStorage::TEvResolveReplicasList::TPtr ResolveReplicas(TTestActorRuntime& runtime, const TString& path) {
28+
const TActorId recipient = MakeStateStorageProxyID();
29+
const TActorId edge = runtime.AllocateEdgeActor();
30+
runtime.Send(recipient, edge, new TEvStateStorage::TEvResolveSchemeBoard(path));
31+
32+
auto result = runtime.GrabEdgeEvent<TEvStateStorage::TEvResolveReplicasList>(edge);
33+
UNIT_ASSERT(result);
34+
return result;
35+
}
36+
2737
NKikimrScheme::TEvDescribeSchemeResult GenerateDescribe(
2838
const TString& path,
2939
TPathId pathId,

ydb/core/tx/scheme_board/ut_helpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "events_internal.h"
66
#include "subscriber.h"
77

8+
#include <ydb/core/base/statestorage_impl.h>
89
#include <ydb/core/base/tablet_types.h>
910
#include <ydb/core/protos/flat_tx_scheme.pb.h>
1011
#include <ydb/core/testlib/basics/appdata.h>
@@ -27,6 +28,7 @@ namespace NSchemeBoard {
2728
void SetupMinimalRuntime(TTestActorRuntime& runtime, const TStateStorageSetupper& setupStateStorage = CreateDefaultStateStorageSetupper());
2829

2930
TIntrusiveConstPtr<TStateStorageInfo> GetStateStorageInfo(TTestActorRuntime& runtime);
31+
TEvStateStorage::TEvResolveReplicasList::TPtr ResolveReplicas(TTestActorRuntime& runtime, const TString& path);
3032

3133
class TTestContext: public TTestBasicRuntime {
3234
public:

0 commit comments

Comments
 (0)