Skip to content

Commit 9901c01

Browse files
authored
Fixed altering of max_active_partition property of the topic (#16768)
1 parent c0e0a07 commit 9901c01

File tree

3 files changed

+111
-1
lines changed

3 files changed

+111
-1
lines changed

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ namespace {
553553
FromString<ui32>(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value())
554554
);
555555
} else if (name == "setMaxPartitions") {
556-
request->mutable_alter_partitioning_settings()->set_set_partition_count_limit(
556+
request->mutable_alter_partitioning_settings()->set_set_max_active_partitions(
557557
FromString<ui32>(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value())
558558
);
559559
} else if (name == "setRetentionPeriod") {

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -924,6 +924,68 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
924924
}
925925
}
926926

927+
Y_UNIT_TEST(PartitionSplit_AutosplitByLoad_AfterAlter) {
928+
TTopicSdkTestSetup setup = CreateSetup();
929+
TTopicClient client = setup.MakeClient();
930+
931+
TCreateTopicSettings createSettings;
932+
createSettings
933+
.BeginConfigurePartitioningSettings()
934+
.MinActivePartitions(1)
935+
.EndConfigurePartitioningSettings();
936+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
937+
938+
TAlterTopicSettings alterSettings;
939+
alterSettings
940+
.BeginAlterPartitioningSettings()
941+
.MinActivePartitions(1)
942+
.MaxActivePartitions(100)
943+
.BeginAlterAutoPartitioningSettings()
944+
.UpUtilizationPercent(2)
945+
.DownUtilizationPercent(1)
946+
.StabilizationWindow(TDuration::Seconds(2))
947+
.Strategy(EAutoPartitioningStrategy::ScaleUp)
948+
.EndAlterAutoPartitioningSettings()
949+
.EndAlterTopicPartitioningSettings();
950+
client.AlterTopic(TEST_TOPIC, alterSettings).Wait();
951+
952+
auto msg = TString(1_MB, 'a');
953+
954+
auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, std::string{TEST_TOPIC}, false);
955+
auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, std::string{TEST_TOPIC}, false);
956+
957+
{
958+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 1)));
959+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 2)));
960+
Sleep(TDuration::Seconds(5));
961+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
962+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
963+
}
964+
965+
{
966+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 3)));
967+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, 4)));
968+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 5)));
969+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, 6)));
970+
Sleep(TDuration::Seconds(5));
971+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
972+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
973+
}
974+
975+
auto writeSession2_1 = CreateWriteSession(client, "producer-1", 1, std::string{TEST_TOPIC}, false);
976+
auto writeSession2_2 = CreateWriteSession(client, "producer-2", 1, std::string{TEST_TOPIC}, false);
977+
978+
{
979+
UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 7)));
980+
UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 8)));
981+
UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 9)));
982+
UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 10)));
983+
Sleep(TDuration::Seconds(5));
984+
auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync();
985+
UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5);
986+
}
987+
}
988+
927989
void ExecuteQuery(NYdb::NTable::TSession& session, const TString& query ) {
928990
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
929991
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

ydb/services/persqueue_v1/topic_yql_ut.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,54 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
186186
//1609462861
187187
}
188188

189+
Y_UNIT_TEST(AlterAutopartitioning) {
190+
NKikimrConfig::TFeatureFlags ff;
191+
ff.SetEnableTopicSplitMerge(true);
192+
auto settings = NKikimr::NPersQueueTests::PQSettings();
193+
settings.SetFeatureFlags(ff);
194+
195+
NPersQueue::TTestServer server(settings);
196+
197+
{
198+
const char *query = R"(
199+
CREATE TOPIC `/Root/PQ/rt3.dc1--legacy--topic1`
200+
)";
201+
202+
server.AnnoyingClient->RunYqlSchemeQuery(query);
203+
}
204+
205+
{
206+
const char *query = R"__(
207+
ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1`
208+
SET (
209+
min_active_partitions = 7,
210+
max_active_partitions = 100,
211+
auto_partitioning_stabilization_window = Interval('PT1S'),
212+
auto_partitioning_up_utilization_percent = 2,
213+
auto_partitioning_down_utilization_percent = 1,
214+
partition_write_speed_bytes_per_second = 3,
215+
auto_partitioning_strategy = 'up'
216+
);
217+
)__";
218+
219+
server.AnnoyingClient->RunYqlSchemeQuery(query);
220+
}
221+
222+
{
223+
auto pqGroup = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription().GetPersQueueGroup();
224+
const auto& describe = pqGroup.GetPQTabletConfig();
225+
226+
Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString();
227+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 3);
228+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMinPartitionCount(), 7);
229+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMaxPartitionCount(), 100);
230+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleThresholdSeconds(), 1);
231+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent(), 2);
232+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent(), 1);
233+
UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(describe.GetPartitionStrategy().GetPartitionStrategyType()), static_cast<int>(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT));
234+
}
235+
}
236+
189237
Y_UNIT_TEST(BadRequests) {
190238
NPersQueue::TTestServer server;
191239
{

0 commit comments

Comments
 (0)