Skip to content

Commit d636c46

Browse files
authored
Merge pull request #16797 from nshestakov/AP-fix-alter-25-1
Fixed altering of max_active_partition property of the topic (#16768)
2 parents 3e1ac5a + 1614064 commit d636c46

File tree

3 files changed

+111
-1
lines changed

3 files changed

+111
-1
lines changed

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ namespace {
531531
FromString<ui32>(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value())
532532
);
533533
} else if (name == "setMaxPartitions") {
534-
request->mutable_alter_partitioning_settings()->set_set_partition_count_limit(
534+
request->mutable_alter_partitioning_settings()->set_set_max_active_partitions(
535535
FromString<ui32>(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value())
536536
);
537537
} else if (name == "setRetentionPeriod") {

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,68 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
895895
}
896896
}
897897

898+
Y_UNIT_TEST(PartitionSplit_AutosplitByLoad_AfterAlter) {
899+
TTopicSdkTestSetup setup = CreateSetup();
900+
TTopicClient client = setup.MakeClient();
901+
902+
TCreateTopicSettings createSettings;
903+
createSettings
904+
.BeginConfigurePartitioningSettings()
905+
.MinActivePartitions(1)
906+
.EndConfigurePartitioningSettings();
907+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
908+
909+
TAlterTopicSettings alterSettings;
910+
alterSettings
911+
.BeginAlterPartitioningSettings()
912+
.MinActivePartitions(1)
913+
.MaxActivePartitions(100)
914+
.BeginAlterAutoPartitioningSettings()
915+
.UpUtilizationPercent(2)
916+
.DownUtilizationPercent(1)
917+
.StabilizationWindow(TDuration::Seconds(2))
918+
.Strategy(EAutoPartitioningStrategy::ScaleUp)
919+
.EndAlterAutoPartitioningSettings()
920+
.EndAlterTopicPartitioningSettings();
921+
client.AlterTopic(TEST_TOPIC, alterSettings).Wait();
922+
923+
auto msg = TString(1_MB, 'a');
924+
925+
auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, std::string{TEST_TOPIC}, false);
926+
auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, std::string{TEST_TOPIC}, false);
927+
928+
{
929+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 1)));
930+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 2)));
931+
Sleep(TDuration::Seconds(5));
932+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
933+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
934+
}
935+
936+
{
937+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 3)));
938+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, 4)));
939+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 5)));
940+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, 6)));
941+
Sleep(TDuration::Seconds(5));
942+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
943+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
944+
}
945+
946+
auto writeSession2_1 = CreateWriteSession(client, "producer-1", 1, std::string{TEST_TOPIC}, false);
947+
auto writeSession2_2 = CreateWriteSession(client, "producer-2", 1, std::string{TEST_TOPIC}, false);
948+
949+
{
950+
UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 7)));
951+
UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 8)));
952+
UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 9)));
953+
UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 10)));
954+
Sleep(TDuration::Seconds(5));
955+
auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync();
956+
UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5);
957+
}
958+
}
959+
898960
void ExecuteQuery(NYdb::NTable::TSession& session, const TString& query ) {
899961
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
900962
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

ydb/services/persqueue_v1/topic_yql_ut.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,54 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
186186
//1609462861
187187
}
188188

189+
Y_UNIT_TEST(AlterAutopartitioning) {
190+
NKikimrConfig::TFeatureFlags ff;
191+
ff.SetEnableTopicSplitMerge(true);
192+
auto settings = NKikimr::NPersQueueTests::PQSettings();
193+
settings.SetFeatureFlags(ff);
194+
195+
NPersQueue::TTestServer server(settings);
196+
197+
{
198+
const char *query = R"(
199+
CREATE TOPIC `/Root/PQ/rt3.dc1--legacy--topic1`
200+
)";
201+
202+
server.AnnoyingClient->RunYqlSchemeQuery(query);
203+
}
204+
205+
{
206+
const char *query = R"__(
207+
ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1`
208+
SET (
209+
min_active_partitions = 7,
210+
max_active_partitions = 100,
211+
auto_partitioning_stabilization_window = Interval('PT1S'),
212+
auto_partitioning_up_utilization_percent = 2,
213+
auto_partitioning_down_utilization_percent = 1,
214+
partition_write_speed_bytes_per_second = 3,
215+
auto_partitioning_strategy = 'up'
216+
);
217+
)__";
218+
219+
server.AnnoyingClient->RunYqlSchemeQuery(query);
220+
}
221+
222+
{
223+
auto pqGroup = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription().GetPersQueueGroup();
224+
const auto& describe = pqGroup.GetPQTabletConfig();
225+
226+
Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString();
227+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 3);
228+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMinPartitionCount(), 7);
229+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMaxPartitionCount(), 100);
230+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleThresholdSeconds(), 1);
231+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent(), 2);
232+
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent(), 1);
233+
UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(describe.GetPartitionStrategy().GetPartitionStrategyType()), static_cast<int>(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT));
234+
}
235+
}
236+
189237
Y_UNIT_TEST(BadRequests) {
190238
NPersQueue::TTestServer server;
191239
{

0 commit comments

Comments
 (0)