Skip to content

Commit 184bf80

Browse files
authored
Add PAUSED strategy for autopartitioning of the topic (#7002)
1 parent fa4c87c commit 184bf80

File tree

11 files changed

+85
-2
lines changed

11 files changed

+85
-2
lines changed

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,57 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
741741
auto v = f.GetValueSync();
742742
UNIT_ASSERT_C(v.IsSuccess(), "Error: " << v);
743743
}
744+
745+
{
746+
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();
747+
748+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), EAutoPartitioningStrategy::Disabled);
749+
}
750+
}
751+
752+
Y_UNIT_TEST(ControlPlane_PauseAutoPartitioning) {
753+
auto topicName = "autoscalit-topic";
754+
755+
TTopicSdkTestSetup setup = CreateSetup();
756+
TTopicClient client = setup.MakeClient();
757+
758+
{
759+
TCreateTopicSettings createSettings;
760+
createSettings
761+
.BeginConfigurePartitioningSettings()
762+
.MinActivePartitions(1)
763+
.MaxActivePartitions(100)
764+
.BeginConfigureAutoPartitioningSettings()
765+
.Strategy(EAutoPartitioningStrategy::ScaleUp)
766+
.EndConfigureAutoPartitioningSettings()
767+
.EndConfigurePartitioningSettings();
768+
client.CreateTopic(topicName, createSettings).Wait();
769+
}
770+
771+
{
772+
TAlterTopicSettings alterSettings;
773+
alterSettings
774+
.BeginAlterPartitioningSettings()
775+
.MinActivePartitions(3)
776+
.MaxActivePartitions(107)
777+
.BeginAlterAutoPartitioningSettings()
778+
.Strategy(EAutoPartitioningStrategy::Paused)
779+
.EndAlterAutoPartitioningSettings()
780+
.EndAlterTopicPartitioningSettings();
781+
auto f = client.AlterTopic(topicName, alterSettings);
782+
f.Wait();
783+
784+
auto v = f.GetValueSync();
785+
UNIT_ASSERT_C(v.IsSuccess(), "Error: " << v);
786+
}
787+
788+
{
789+
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();
790+
791+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 3);
792+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), 107);
793+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), EAutoPartitioningStrategy::Paused);
794+
}
744795
}
745796

746797
Y_UNIT_TEST(ControlPlane_AutoscalingWithStorageSizeRetention) {

ydb/core/protos/pqconfig.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ message TPQTabletConfig {
404404
CAN_SPLIT = 1;
405405
// The autoscaling algorithm will both increase and decrease partitions count depending on the load characteristics.
406406
CAN_SPLIT_AND_MERGE = 2;
407+
PAUSED = 3;
407408
}
408409

409410
// Strategy for automatically changing the number of topic partitions depending on the load

ydb/public/api/protos/draft/datastreams.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,8 @@ enum AutoPartitioningStrategy {
289289
AUTO_PARTITIONING_STRATEGY_SCALE_UP = 2;
290290
// The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics.
291291
AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN = 3;
292+
// The auto partitioning is paused.
293+
AUTO_PARTITIONING_STRATEGY_PAUSED = 4;
292294
}
293295

294296
// Partitioning settings for stream.

ydb/public/api/protos/ydb_persqueue_v1.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,6 +1092,8 @@ enum AutoPartitioningStrategy {
10921092
AUTO_PARTITIONING_STRATEGY_SCALE_UP = 2;
10931093
// The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics.
10941094
AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN = 3;
1095+
// The auto partitioning is paused.
1096+
AUTO_PARTITIONING_STRATEGY_PAUSED = 4;
10951097
}
10961098

10971099
/**

ydb/public/api/protos/ydb_topic.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,8 @@ enum AutoPartitioningStrategy {
842842
AUTO_PARTITIONING_STRATEGY_SCALE_UP = 2;
843843
// The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics.
844844
AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN = 3;
845+
// The auto partitioning is paused.
846+
AUTO_PARTITIONING_STRATEGY_PAUSED = 4;
845847
}
846848

847849
// Partitioning settings for topic.

ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ namespace NYdb::NDataStreams::V1 {
3535
case EAutoPartitioningStrategy::ScaleUpAndDown:
3636
strategy = ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN;
3737
break;
38+
case EAutoPartitioningStrategy::Paused:
39+
strategy = ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED;
40+
break;
3841
}
3942

4043
pt->mutable_auto_partitioning_settings()->set_strategy(strategy);

ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ namespace NYdb::NDataStreams::V1 {
106106
Disabled = 1,
107107
ScaleUp = 2,
108108
ScaleUpAndDown = 3,
109+
Paused = 4
109110
};
110111

111112
struct TCreateStreamSettings;

ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ enum class EAutoPartitioningStrategy: ui32 {
3535
Disabled = 1,
3636
ScaleUp = 2,
3737
ScaleUpAndDown = 3,
38+
Paused = 4
3839
};
3940

4041
class TConsumer {

ydb/services/datastreams/datastreams_proxy.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ namespace NKikimr::NDataStreams::V1 {
6666

6767
TString ValidatePartitioningSettings(const ::Ydb::DataStreams::V1::PartitioningSettings& s) {
6868
if (s.auto_partitioning_settings().strategy() == ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP
69-
|| s.auto_partitioning_settings().strategy() == ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN) {
69+
|| s.auto_partitioning_settings().strategy() == ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN
70+
|| s.auto_partitioning_settings().strategy() == ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED) {
7071

7172
if (s.min_active_partitions() < 0) {
7273
return "min_active_partitions must be great than 0";
@@ -502,6 +503,10 @@ namespace NKikimr::NDataStreams::V1 {
502503
case Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN:
503504
t->SetPartitionStrategyType(NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE);
504505
break;
506+
507+
case Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED:
508+
t->SetPartitionStrategyType(NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_PAUSED);
509+
break;
505510
}
506511

507512
auto& ws = as.partition_write_speed();
@@ -785,10 +790,13 @@ namespace NKikimr::NDataStreams::V1 {
785790
if (ps.GetPartitionStrategyType() != NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED) {
786791
pt->set_min_active_partitions(ps.GetMinPartitionCount());
787792
pt->set_max_active_partitions(ps.GetMaxPartitionCount());
793+
788794
if (ps.GetPartitionStrategyType() == NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT) {
789795
pt->mutable_auto_partitioning_settings()->set_strategy(::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP);
790-
} else {
796+
} else if (ps.GetPartitionStrategyType() == NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE) {
791797
pt->mutable_auto_partitioning_settings()->set_strategy(::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN);
798+
} else {
799+
pt->mutable_auto_partitioning_settings()->set_strategy(::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED);
792800
}
793801

794802
pt->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds(ps.GetScaleThresholdSeconds());

ydb/services/lib/actors/pq_schema_actor.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,9 @@ namespace NKikimr::NGRpcProxy::V1 {
785785
case ::Ydb::PersQueue::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN:
786786
pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE);
787787
break;
788+
case ::Ydb::PersQueue::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED:
789+
pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_PAUSED);
790+
break;
788791
default:
789792
pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED);
790793
break;
@@ -1110,6 +1113,9 @@ namespace NKikimr::NGRpcProxy::V1 {
11101113
case ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN:
11111114
pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE);
11121115
break;
1116+
case ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED:
1117+
pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_PAUSED);
1118+
break;
11131119
default:
11141120
pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED);
11151121
break;
@@ -1290,6 +1296,9 @@ namespace NKikimr::NGRpcProxy::V1 {
12901296
case ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN:
12911297
pqTabletConfig->MutablePartitionStrategy()->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE);
12921298
break;
1299+
case ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED:
1300+
pqTabletConfig->MutablePartitionStrategy()->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_PAUSED);
1301+
break;
12931302
default:
12941303
pqTabletConfig->MutablePartitionStrategy()->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED);
12951304
break;

0 commit comments

Comments
 (0)