Skip to content

Commit 7b9578c

Browse files
authored
More test for autopartitionin of topics (#10613)
1 parent 2a74bac commit 7b9578c

File tree

1 file changed

+58
-31
lines changed

1 file changed

+58
-31
lines changed

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 58 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -929,9 +929,63 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
929929
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
930930
}
931931

932+
ui64 GetBalancerTabletId(TTopicSdkTestSetup& setup, const TString& topicPath) {
933+
auto pathDescr = setup.GetServer().AnnoyingClient->Ls(topicPath)->Record.GetPathDescription().GetSelf();
934+
auto balancerTabletId = pathDescr.GetBalancerTabletID();
935+
Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
936+
UNIT_ASSERT(balancerTabletId);
937+
return balancerTabletId;
938+
}
939+
940+
void SplitPartition(TTopicSdkTestSetup& setup, const TString& topicPath, ui32 partitionId) {
941+
auto balancerTabletId = GetBalancerTabletId(setup, topicPath);
942+
auto edge = setup.GetRuntime().AllocateEdgeActor();
943+
setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(partitionId, NKikimrPQ::EScaleStatus::NEED_SPLIT));
944+
}
945+
946+
void AssertPartitionCount(TTopicSdkTestSetup& setup, const TString& topicPath, size_t expectedCount) {
947+
auto client = setup.MakeClient();
948+
auto describe = client.DescribeTopic(topicPath).GetValueSync();
949+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), expectedCount);
950+
}
951+
952+
void WaitAndAssertPartitionCount(TTopicSdkTestSetup& setup, const TString& topicPath, size_t expectedCount) {
953+
auto client = setup.MakeClient();
954+
size_t partitionCount = 0;
955+
for (size_t i = 0; i < 10; ++i) {
956+
Sleep(TDuration::Seconds(1));
957+
auto describe = client.DescribeTopic(topicPath).GetValueSync();
958+
partitionCount = describe.GetTopicDescription().GetPartitions().size();
959+
if (partitionCount == expectedCount) {
960+
break;
961+
}
962+
}
963+
UNIT_ASSERT_VALUES_EQUAL(partitionCount, expectedCount);
964+
}
965+
966+
Y_UNIT_TEST(WithDir_PartitionSplit_AutosplitByLoad) {
967+
TTopicSdkTestSetup setup = CreateSetup();
968+
auto tableClient = setup.MakeTableClient();
969+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
970+
971+
setup.GetServer().AnnoyingClient->MkDir("/Root", "dir");
972+
973+
ExecuteQuery(session, R"(
974+
--!syntax_v1
975+
CREATE TOPIC `/Root/dir/origin`
976+
WITH (
977+
AUTO_PARTITIONING_STRATEGY = 'SCALE_UP',
978+
MAX_ACTIVE_PARTITIONS = 50
979+
);
980+
)");
981+
982+
AssertPartitionCount(setup, "/Root/dir/origin", 1);
983+
SplitPartition(setup, "/Root/dir/origin", 0);
984+
WaitAndAssertPartitionCount(setup, "/Root/dir/origin", 3);
985+
}
986+
932987
Y_UNIT_TEST(CDC_PartitionSplit_AutosplitByLoad) {
933988
TTopicSdkTestSetup setup = CreateSetup();
934-
auto client = setup.MakeClient();
935989
auto tableClient = setup.MakeTableClient();
936990
auto session = tableClient.CreateSession().GetValueSync().GetSession();
937991

@@ -954,36 +1008,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
9541008
);
9551009
)");
9561010

957-
{
958-
auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync();
959-
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
960-
}
961-
962-
ui64 balancerTabletId;
963-
{
964-
auto pathDescr = setup.GetServer().AnnoyingClient->Ls("/Root/origin/feed/streamImpl")->Record.GetPathDescription().GetSelf();
965-
balancerTabletId = pathDescr.GetBalancerTabletID();
966-
Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
967-
UNIT_ASSERT(balancerTabletId);
968-
}
969-
970-
{
971-
const auto edge = setup.GetRuntime().AllocateEdgeActor();
972-
setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(0, NKikimrPQ::EScaleStatus::NEED_SPLIT));
973-
}
974-
975-
{
976-
size_t partitionCount = 0;
977-
for (size_t i = 0; i < 10; ++i) {
978-
Sleep(TDuration::Seconds(1));
979-
auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync();
980-
partitionCount = describe.GetTopicDescription().GetPartitions().size();
981-
if (partitionCount == 3) {
982-
break;
983-
}
984-
}
985-
UNIT_ASSERT_VALUES_EQUAL(partitionCount, 3);
986-
}
1011+
AssertPartitionCount(setup, "/Root/origin/feed", 1);
1012+
SplitPartition(setup, "/Root/origin/feed/streamImpl", 0);
1013+
WaitAndAssertPartitionCount(setup, "/Root/origin/feed", 3);
9871014
}
9881015

9891016
Y_UNIT_TEST(MidOfRange) {

0 commit comments

Comments
 (0)