From 63af5a51fbb37bc379009207d58e927cfd8ee0fd Mon Sep 17 00:00:00 2001 From: Daniil Demin Date: Mon, 7 Jul 2025 11:21:00 +0000 Subject: [PATCH 1/2] fail sync requests on concurrent reconfiguration --- ydb/core/tx/scheme_board/subscriber.cpp | 5 ++ ydb/core/tx/scheme_board/subscriber_ut.cpp | 91 ++++++++++++++++++++++ ydb/core/tx/scheme_board/ut_helpers.cpp | 10 +++ ydb/core/tx/scheme_board/ut_helpers.h | 2 + 4 files changed, 108 insertions(+) diff --git a/ydb/core/tx/scheme_board/subscriber.cpp b/ydb/core/tx/scheme_board/subscriber.cpp index dec4cbdb3e8f..e0362bcd2a52 100644 --- a/ydb/core/tx/scheme_board/subscriber.cpp +++ b/ydb/core/tx/scheme_board/subscriber.cpp @@ -1040,6 +1040,10 @@ class TSubscriber: public TMonitorableActor { this->Send(proxy, new TEvents::TEvPoisonPill()); } } + if (CurrentSyncRequest) { + this->Send(Owner, new NInternalEvents::TEvSyncResponse(Path, true), 0, CurrentSyncRequest); + CurrentSyncRequest = 0; + } ProxyToGroupMap.clear(); ProxyGroups.clear(); States.clear(); @@ -1076,6 +1080,7 @@ class TSubscriber: public TMonitorableActor { ClusterState.Guid = ev->Get()->ClusterStateGuid; this->Become(&TDerived::StateWork); + MaybeRunVersionSync(); } void Handle(TSchemeBoardMonEvents::TEvInfoRequest::TPtr& ev) { diff --git a/ydb/core/tx/scheme_board/subscriber_ut.cpp b/ydb/core/tx/scheme_board/subscriber_ut.cpp index 8d58baa1f8cf..6c5800c81c0d 100644 --- a/ydb/core/tx/scheme_board/subscriber_ut.cpp +++ b/ydb/core/tx/scheme_board/subscriber_ut.cpp @@ -850,6 +850,97 @@ Y_UNIT_TEST_SUITE(TSubscriberSyncQuorumTest) { UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Partial, true, syncResponse->ToString()); } } + + Y_UNIT_TEST(ReconfigurationWithDelayedSyncRequest) { + TTestBasicRuntime runtime; + SetupMinimalRuntime(runtime); + + runtime.SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG); + + const auto stateStorageInfo = GetStateStorageInfo(runtime); + const auto participatingReplicas = CountParticipatingReplicas(*stateStorageInfo); + + constexpr int DomainId = 1; + constexpr const char* Path = "TestPath"; + const TActorId edge = runtime.AllocateEdgeActor(); + + const TActorId subscriber = runtime.Register(CreateSchemeBoardSubscriber(edge, Path, DomainId)); + TBlockEvents notificationBlocker(runtime, [&](const NInternalEvents::TEvNotify::TPtr& ev) { + return ev->Recipient == subscriber; + }); + runtime.WaitFor("initial path lookups", [&]() { + return notificationBlocker.size() == participatingReplicas; + }, TDuration::Seconds(10)); + + // Send sync request: subscriber will queue it in DelayedSyncRequest since it cannot process syncs before finishing its initialization. + constexpr ui64 cookie = 12345; + runtime.Send(new IEventHandle(subscriber, edge, new NInternalEvents::TEvSyncRequest(), 0, cookie)); + + auto replicas = ResolveReplicas(runtime, Path); + runtime.Send(subscriber, edge, replicas->Release().Release()); + + // Now allow all notifications through so that initialization completes. + notificationBlocker.Stop().Unblock(); + + auto syncResponse = runtime.GrabEdgeEvent(edge, TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Path, Path, syncResponse->ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Cookie, cookie, syncResponse->ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Partial, false, syncResponse->ToString()); + + // No additional sync responses. + UNIT_CHECK_GENERATED_EXCEPTION( + runtime.GrabEdgeEvent(edge, TDuration::Seconds(10)), + TEmptyEventQueueException + ); + } + + Y_UNIT_TEST(ReconfigurationWithCurrentSyncRequest) { + TTestBasicRuntime runtime; + SetupMinimalRuntime(runtime); + + runtime.SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG); + + const auto stateStorageInfo = GetStateStorageInfo(runtime); + const auto participatingReplicas = CountParticipatingReplicas(*stateStorageInfo); + + constexpr int DomainId = 1; + constexpr const char* Path = "TestPath"; + const TActorId edge = runtime.AllocateEdgeActor(); + + const TActorId subscriber = runtime.Register(CreateSchemeBoardSubscriber(edge, Path, DomainId)); + TBlockEvents notificationBlocker(runtime, [&](const NInternalEvents::TEvNotify::TPtr& ev) { + return ev->Recipient == subscriber; + }); + runtime.WaitFor("initial path lookups", [&]() { + return notificationBlocker.size() == participatingReplicas; + }, TDuration::Seconds(10)); + notificationBlocker.Stop().Unblock(); + + constexpr ui64 cookie = 12345; + TBlockEvents syncResponseBlocker(runtime, [&](const NInternalEvents::TEvSyncVersionResponse::TPtr& ev) { + return ev->Recipient == subscriber && ev->Cookie == cookie; + }); + runtime.Send(new IEventHandle(subscriber, edge, new NInternalEvents::TEvSyncRequest(), 0, cookie)); + runtime.WaitFor("some sync responses", [&]() { + return !syncResponseBlocker.empty(); + }, TDuration::Seconds(10)); + syncResponseBlocker.Unblock(1); + + auto replicas = ResolveReplicas(runtime, Path); + runtime.Send(subscriber, edge, replicas->Release().Release()); + syncResponseBlocker.Stop().Unblock(); + + auto syncResponse = runtime.GrabEdgeEvent(edge, TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Path, Path, syncResponse->ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Cookie, cookie, syncResponse->ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(syncResponse->Get()->Partial, true, syncResponse->ToString()); + + // No additional sync responses. + UNIT_CHECK_GENERATED_EXCEPTION( + runtime.GrabEdgeEvent(edge, TDuration::Seconds(10)), + TEmptyEventQueueException + ); + } } } // NSchemeBoard diff --git a/ydb/core/tx/scheme_board/ut_helpers.cpp b/ydb/core/tx/scheme_board/ut_helpers.cpp index f90314500319..ed891e759d8d 100644 --- a/ydb/core/tx/scheme_board/ut_helpers.cpp +++ b/ydb/core/tx/scheme_board/ut_helpers.cpp @@ -24,6 +24,16 @@ TIntrusiveConstPtr GetStateStorageInfo(TTestActorRuntime& run return result->Get()->Info; } +TEvStateStorage::TEvResolveReplicasList::TPtr ResolveReplicas(TTestActorRuntime& runtime, const TString& path) { + const TActorId recipient = MakeStateStorageProxyID(); + const TActorId edge = runtime.AllocateEdgeActor(); + runtime.Send(recipient, edge, new TEvStateStorage::TEvResolveSchemeBoard(path)); + + auto result = runtime.GrabEdgeEvent(edge); + UNIT_ASSERT(result); + return result; +} + NKikimrScheme::TEvDescribeSchemeResult GenerateDescribe( const TString& path, TPathId pathId, diff --git a/ydb/core/tx/scheme_board/ut_helpers.h b/ydb/core/tx/scheme_board/ut_helpers.h index 8efce28a0145..ecdbeb98bd06 100644 --- a/ydb/core/tx/scheme_board/ut_helpers.h +++ b/ydb/core/tx/scheme_board/ut_helpers.h @@ -5,6 +5,7 @@ #include "events_internal.h" #include "subscriber.h" +#include #include #include #include @@ -27,6 +28,7 @@ namespace NSchemeBoard { void SetupMinimalRuntime(TTestActorRuntime& runtime, const TStateStorageSetupper& setupStateStorage = CreateDefaultStateStorageSetupper()); TIntrusiveConstPtr GetStateStorageInfo(TTestActorRuntime& runtime); +TEvStateStorage::TEvResolveReplicasList::TPtr ResolveReplicas(TTestActorRuntime& runtime, const TString& path); class TTestContext: public TTestBasicRuntime { public: From 2cfe0d39a053829ab1f4d150afe6c7893bd2c64c Mon Sep 17 00:00:00 2001 From: Daniil Demin Date: Tue, 8 Jul 2025 15:10:36 +0000 Subject: [PATCH 2/2] fix broken tests --- ydb/core/tx/scheme_board/subscriber.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/tx/scheme_board/subscriber.cpp b/ydb/core/tx/scheme_board/subscriber.cpp index e0362bcd2a52..0d4de55ec9bb 100644 --- a/ydb/core/tx/scheme_board/subscriber.cpp +++ b/ydb/core/tx/scheme_board/subscriber.cpp @@ -1080,7 +1080,6 @@ class TSubscriber: public TMonitorableActor { ClusterState.Guid = ev->Get()->ClusterStateGuid; this->Become(&TDerived::StateWork); - MaybeRunVersionSync(); } void Handle(TSchemeBoardMonEvents::TEvInfoRequest::TPtr& ev) {