Skip to content

Commit 59a1386

Browse files
niksavelievGazizonoki
authored andcommitted
Moved "Autoscaling control plane" commit from ydb repo
1 parent 6972b21 commit 59a1386

File tree

4 files changed

+227
-21
lines changed

4 files changed

+227
-21
lines changed

include/ydb-cpp-sdk/client/topic/control_plane.h

Lines changed: 168 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
#include <limits>
1212

13-
1413
namespace NYdb {
1514
class TProtoAccessor;
1615

@@ -29,6 +28,12 @@ enum class EMeteringMode : ui32 {
2928
Unknown = std::numeric_limits<int>::max(),
3029
};
3130

31+
enum class EAutoscalingStrategy: ui32 {
32+
Unspecified = 0,
33+
Disabled = 1,
34+
ScaleUp = 2,
35+
ScaleUpAndDown = 3,
36+
};
3237

3338
class TConsumer {
3439
public:
@@ -93,7 +98,6 @@ class TPartitionStats {
9398
ui64 BytesWrittenPerMinute_;
9499
ui64 BytesWrittenPerHour_;
95100
ui64 BytesWrittenPerDay_;
96-
97101
};
98102

99103
class TPartitionConsumerStats {
@@ -150,20 +154,98 @@ class TPartitionInfo {
150154
std::optional<TPartitionLocation> PartitionLocation_;
151155
};
152156

157+
struct TAlterPartitioningSettings;
158+
struct TAlterTopicSettings;
159+
160+
struct TAutoscalingSettings {
161+
friend struct TAutoscalingSettingsBuilder;
162+
public:
163+
TAutoscalingSettings()
164+
: Strategy_(EAutoscalingStrategy::Disabled)
165+
, ThresholdTime_(TDuration::Seconds(0))
166+
, ScaleDownThresholdPercent_(0)
167+
, ScaleUpThresholdPercent_(0) {
168+
}
169+
TAutoscalingSettings(const Ydb::Topic::AutoscalingSettings& settings);
170+
TAutoscalingSettings(EAutoscalingStrategy strategy, TDuration thresholdTime, ui64 scaleUpThresholdPercent, ui64 scaleDownThresholdPercent)
171+
: Strategy_(strategy)
172+
, ThresholdTime_(thresholdTime)
173+
, ScaleDownThresholdPercent_(scaleDownThresholdPercent)
174+
, ScaleUpThresholdPercent_(scaleUpThresholdPercent) {}
175+
176+
EAutoscalingStrategy GetStrategy() const;
177+
TDuration GetThresholdTime() const;
178+
ui32 GetScaleDownThresholdPercent() const;
179+
ui32 GetScaleUpThresholdPercent() const;
180+
private:
181+
EAutoscalingStrategy Strategy_;
182+
TDuration ThresholdTime_;
183+
ui32 ScaleDownThresholdPercent_;
184+
ui32 ScaleUpThresholdPercent_;
185+
};
186+
187+
struct TAlterAutoscalingSettings {
188+
using TSelf = TAlterAutoscalingSettings;
189+
public:
190+
TAlterAutoscalingSettings(TAlterPartitioningSettings& parent): Parent_(parent) {}
191+
192+
FLUENT_SETTING_OPTIONAL(EAutoscalingStrategy, Strategy);
193+
FLUENT_SETTING_OPTIONAL(TDuration, ThresholdTime);
194+
FLUENT_SETTING_OPTIONAL(ui64, ScaleUpThresholdPercent);
195+
FLUENT_SETTING_OPTIONAL(ui64, ScaleDownThresholdPercent);
196+
197+
TAlterPartitioningSettings& EndAlterAutoscalingSettings() { return Parent_; };
198+
199+
private:
200+
TAlterPartitioningSettings& Parent_;
201+
};
202+
153203
class TPartitioningSettings {
204+
using TSelf = TPartitioningSettings;
205+
friend struct TPartitioningSettingsBuilder;
154206
public:
155-
TPartitioningSettings() : MinActivePartitions_(0), PartitionCountLimit_(0){}
207+
TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), PartitionCountLimit_(0), AutoscalingSettings_(){}
156208
TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings);
157-
TPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit)
209+
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoscalingSettings autoscalingSettings = {})
158210
: MinActivePartitions_(minActivePartitions)
159-
, PartitionCountLimit_(partitionCountLimit) {
211+
, MaxActivePartitions_(maxActivePartitions)
212+
, PartitionCountLimit_(0)
213+
, AutoscalingSettings_(autoscalingSettings)
214+
{
160215
}
161216

162217
ui64 GetMinActivePartitions() const;
218+
ui64 GetMaxActivePartitions() const;
163219
ui64 GetPartitionCountLimit() const;
220+
TAutoscalingSettings GetAutoscalingSettings() const;
164221
private:
165222
ui64 MinActivePartitions_;
223+
ui64 MaxActivePartitions_;
166224
ui64 PartitionCountLimit_;
225+
TAutoscalingSettings AutoscalingSettings_;
226+
};
227+
228+
struct TAlterTopicSettings;
229+
230+
struct TAlterPartitioningSettings {
231+
using TSelf = TAlterPartitioningSettings;
232+
public:
233+
TAlterPartitioningSettings(TAlterTopicSettings& parent): Parent_(parent) {}
234+
235+
FLUENT_SETTING_OPTIONAL(ui64, MinActivePartitions);
236+
FLUENT_SETTING_OPTIONAL(ui64, MaxActivePartitions);
237+
238+
TAlterTopicSettings& EndAlterTopicPartitioningSettings() { return Parent_; };
239+
240+
TAlterAutoscalingSettings& BeginAlterAutoscalingSettings() {
241+
AutoscalingSettings_.emplace(*this);
242+
return *AutoscalingSettings_;
243+
}
244+
245+
std::optional<TAlterAutoscalingSettings> AutoscalingSettings_;
246+
247+
private:
248+
TAlterTopicSettings& Parent_;
167249
};
168250

169251
class TTopicDescription {
@@ -267,7 +349,6 @@ class TPartitionDescription {
267349
struct TDescribeTopicResult : public TStatus {
268350
friend class NYdb::TProtoAccessor;
269351

270-
271352
TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result);
272353

273354
const TTopicDescription& GetTopicDescription() const;
@@ -280,7 +361,6 @@ struct TDescribeTopicResult : public TStatus {
280361
struct TDescribeConsumerResult : public TStatus {
281362
friend class NYdb::TProtoAccessor;
282363

283-
284364
TDescribeConsumerResult(TStatus&& status, Ydb::Topic::DescribeConsumerResult&& result);
285365

286366
const TConsumerDescription& GetConsumerDescription() const;
@@ -331,7 +411,6 @@ class TAlterAttributesBuilderImpl {
331411
TSettings& Parent_;
332412
};
333413

334-
335414
struct TAlterConsumerSettings;
336415
struct TAlterTopicSettings;
337416

@@ -386,7 +465,6 @@ struct TConsumerSettings {
386465
TSettings& Parent_;
387466
};
388467

389-
390468
struct TAlterConsumerSettings {
391469
using TSelf = TAlterConsumerSettings;
392470

@@ -423,7 +501,7 @@ struct TAlterConsumerSettings {
423501
TAlterTopicSettings& Parent_;
424502
};
425503

426-
504+
struct TPartitioningSettingsBuilder;
427505
struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSettings> {
428506

429507
using TSelf = TCreateTopicSettings;
@@ -445,7 +523,6 @@ struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSetti
445523

446524
FLUENT_SETTING(TAttributes, Attributes);
447525

448-
449526
TCreateTopicSettings& SetSupportedCodecs(std::vector<ECodec>&& codecs) {
450527
SupportedCodecs_ = std::move(codecs);
451528
return *this;
@@ -481,20 +558,80 @@ struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSetti
481558
return *this;
482559
}
483560

484-
TCreateTopicSettings& PartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) {
485-
PartitioningSettings_ = TPartitioningSettings(minActivePartitions, partitionCountLimit);
561+
TCreateTopicSettings& PartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoscalingSettings autoscalingSettings = {}) {
562+
PartitioningSettings_ = TPartitioningSettings(minActivePartitions, maxActivePartitions, autoscalingSettings);
563+
return *this;
564+
}
565+
566+
TPartitioningSettingsBuilder BeginConfigurePartitioningSettings();
567+
};
568+
569+
struct TAutoscalingSettingsBuilder {
570+
using TSelf = TAutoscalingSettingsBuilder;
571+
public:
572+
TAutoscalingSettingsBuilder(TPartitioningSettingsBuilder& parent, TAutoscalingSettings& settings): Parent_(parent), Settings_(settings) {}
573+
574+
TSelf Strategy(EAutoscalingStrategy value) {
575+
Settings_.Strategy_ = value;
576+
return *this;
577+
}
578+
579+
TSelf ThresholdTime(TDuration value) {
580+
Settings_.ThresholdTime_ = value;
581+
return *this;
582+
}
583+
584+
TSelf ScaleDownThresholdPercent(ui32 value) {
585+
Settings_.ScaleDownThresholdPercent_ = value;
486586
return *this;
487587
}
588+
589+
TSelf ScaleUpThresholdPercent(ui32 value) {
590+
Settings_.ScaleUpThresholdPercent_ = value;
591+
return *this;
592+
}
593+
594+
TPartitioningSettingsBuilder& EndConfigureAutoscalingSettings() {
595+
return Parent_;
596+
}
597+
598+
private:
599+
TPartitioningSettingsBuilder& Parent_;
600+
TAutoscalingSettings& Settings_;
488601
};
489602

603+
struct TPartitioningSettingsBuilder {
604+
using TSelf = TPartitioningSettingsBuilder;
605+
public:
606+
TPartitioningSettingsBuilder(TCreateTopicSettings& parent): Parent_(parent) {}
607+
608+
TSelf MinActivePartitions(ui64 value) {
609+
Parent_.PartitioningSettings_.MinActivePartitions_ = value;
610+
return *this;
611+
}
612+
613+
TSelf MaxActivePartitions(ui64 value) {
614+
Parent_.PartitioningSettings_.MaxActivePartitions_ = value;
615+
return *this;
616+
}
617+
618+
TAutoscalingSettingsBuilder BeginConfigureAutoscalingSettings() {
619+
return {*this, Parent_.PartitioningSettings_.AutoscalingSettings_};
620+
}
621+
622+
TCreateTopicSettings& EndConfigurePartitioningSettings() {
623+
return Parent_;
624+
}
625+
626+
private:
627+
TCreateTopicSettings& Parent_;
628+
};
490629

491630
struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSettings> {
492631

493632
using TSelf = TAlterTopicSettings;
494633
using TAlterAttributes = std::map<std::string, std::string>;
495634

496-
FLUENT_SETTING_OPTIONAL(TPartitioningSettings, AlterPartitioningSettings);
497-
498635
FLUENT_SETTING_OPTIONAL(TDuration, SetRetentionPeriod);
499636

500637
FLUENT_SETTING_OPTIONAL_VECTOR(ECodec, SetSupportedCodecs);
@@ -546,12 +683,25 @@ struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSetting
546683
return AlterConsumers_.back();
547684
}
548685

549-
TAlterTopicSettings& AlterPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) {
550-
AlterPartitioningSettings_ = TPartitioningSettings(minActivePartitions, partitionCountLimit);
686+
TAlterPartitioningSettings& BeginAlterPartitioningSettings() {
687+
AlterPartitioningSettings_.emplace(*this);
688+
return *AlterPartitioningSettings_;
689+
}
690+
691+
TAlterTopicSettings& AlterPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions) {
692+
AlterPartitioningSettings_.emplace(*this);
693+
AlterPartitioningSettings_->MinActivePartitions(minActivePartitions);
694+
AlterPartitioningSettings_->MaxActivePartitions(maxActivePartitions);
551695
return *this;
552696
}
697+
698+
std::optional<TAlterPartitioningSettings> AlterPartitioningSettings_;
553699
};
554700

701+
inline TPartitioningSettingsBuilder TCreateTopicSettings::BeginConfigurePartitioningSettings() {
702+
return {*this};
703+
}
704+
555705
// Settings for drop resource request.
556706
struct TDropTopicSettings : public TOperationRequestSettings<TDropTopicSettings> {
557707
using TOperationRequestSettings<TDropTopicSettings>::TOperationRequestSettings;
@@ -587,4 +737,4 @@ struct TDescribePartitionSettings: public TOperationRequestSettings<TDescribePar
587737
// Settings for commit offset request.
588738
struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSettings> {};
589739

590-
}
740+
} // namespace NYdb::NTopic

src/api/protos/ydb_topic.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -871,7 +871,7 @@ message AlterPartitioningSettings {
871871
optional int64 set_min_active_partitions = 1 [(Ydb.value) = ">= 0"];
872872
// Maximum partition count auto merge would stop working at.
873873
// Zero value means default - 1.
874-
optional int64 max_active_partitions = 3 [(Ydb.value) = ">= 0"];
874+
optional int64 set_max_active_partitions = 3 [(Ydb.value) = ">= 0"];
875875
// Limit for total partition count, including active (open for write) and read-only partitions.
876876
// Zero value means default - 100.
877877
// Use set_max_active_partitions

src/client/topic/impl/topic.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,17 +222,50 @@ const std::vector<NScheme::TPermissions>& TTopicDescription::GetEffectivePermiss
222222

223223
TPartitioningSettings::TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings)
224224
: MinActivePartitions_(settings.min_active_partitions())
225+
, MaxActivePartitions_(settings.max_active_partitions())
225226
, PartitionCountLimit_(settings.partition_count_limit())
227+
, AutoscalingSettings_(settings.autoscaling_settings())
226228
{}
227229

228230
ui64 TPartitioningSettings::GetMinActivePartitions() const {
229231
return MinActivePartitions_;
230232
}
231233

234+
ui64 TPartitioningSettings::GetMaxActivePartitions() const {
235+
return MaxActivePartitions_;
236+
}
237+
232238
ui64 TPartitioningSettings::GetPartitionCountLimit() const {
233239
return PartitionCountLimit_;
234240
}
235241

242+
TAutoscalingSettings TPartitioningSettings::GetAutoscalingSettings() const {
243+
return AutoscalingSettings_;
244+
}
245+
246+
TAutoscalingSettings::TAutoscalingSettings(const Ydb::Topic::AutoscalingSettings& settings)
247+
: Strategy_(static_cast<EAutoscalingStrategy>(settings.strategy()))
248+
, ThresholdTime_(TDuration::Seconds(settings.partition_write_speed().threshold_time().seconds()))
249+
, ScaleDownThresholdPercent_(settings.partition_write_speed().scale_down_threshold_percent())
250+
, ScaleUpThresholdPercent_(settings.partition_write_speed().scale_up_threshold_percent())
251+
{}
252+
253+
EAutoscalingStrategy TAutoscalingSettings::GetStrategy() const {
254+
return Strategy_;
255+
}
256+
257+
TDuration TAutoscalingSettings::GetThresholdTime() const {
258+
return ThresholdTime_;
259+
}
260+
261+
ui32 TAutoscalingSettings::GetScaleUpThresholdPercent() const {
262+
return ScaleUpThresholdPercent_;
263+
}
264+
265+
ui32 TAutoscalingSettings::GetScaleDownThresholdPercent() const {
266+
return ScaleDownThresholdPercent_;
267+
}
268+
236269
TTopicStats::TTopicStats(const Ydb::Topic::DescribeTopicResult::TopicStats& topicStats)
237270
: StoreSizeBytes_(topicStats.store_size_bytes())
238271
, MinLastWriteTime_(TInstant::Seconds(topicStats.min_last_write_time().seconds()))

src/client/topic/impl/topic_impl.h

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
7979

8080
request.mutable_partitioning_settings()->set_min_active_partitions(settings.PartitioningSettings_.GetMinActivePartitions());
8181
request.mutable_partitioning_settings()->set_partition_count_limit(settings.PartitioningSettings_.GetPartitionCountLimit());
82+
request.mutable_partitioning_settings()->set_max_active_partitions(settings.PartitioningSettings_.GetMaxActivePartitions());
83+
request.mutable_partitioning_settings()->mutable_autoscaling_settings()->set_strategy(static_cast<Ydb::Topic::AutoscalingStrategy>(settings.PartitioningSettings_.GetAutoscalingSettings().GetStrategy()));
84+
request.mutable_partitioning_settings()->mutable_autoscaling_settings()->mutable_partition_write_speed()->mutable_threshold_time()->set_seconds(settings.PartitioningSettings_.GetAutoscalingSettings().GetThresholdTime().Seconds());
85+
request.mutable_partitioning_settings()->mutable_autoscaling_settings()->mutable_partition_write_speed()->set_scale_up_threshold_percent(settings.PartitioningSettings_.GetAutoscalingSettings().GetScaleUpThresholdPercent());
86+
request.mutable_partitioning_settings()->mutable_autoscaling_settings()->mutable_partition_write_speed()->set_scale_down_threshold_percent(settings.PartitioningSettings_.GetAutoscalingSettings().GetScaleDownThresholdPercent());
8287

8388
request.mutable_retention_period()->set_seconds(settings.RetentionPeriod_.Seconds());
8489

@@ -117,8 +122,26 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
117122
request.set_path(path);
118123

119124
if (settings.AlterPartitioningSettings_) {
120-
request.mutable_alter_partitioning_settings()->set_set_min_active_partitions(settings.AlterPartitioningSettings_->GetMinActivePartitions());
121-
request.mutable_alter_partitioning_settings()->set_set_partition_count_limit(settings.AlterPartitioningSettings_->GetPartitionCountLimit());
125+
if (settings.AlterPartitioningSettings_->MinActivePartitions_) {
126+
request.mutable_alter_partitioning_settings()->set_set_min_active_partitions(*settings.AlterPartitioningSettings_->MinActivePartitions_);
127+
}
128+
if (settings.AlterPartitioningSettings_->MaxActivePartitions_) {
129+
request.mutable_alter_partitioning_settings()->set_set_max_active_partitions(*settings.AlterPartitioningSettings_->MaxActivePartitions_);
130+
}
131+
if (settings.AlterPartitioningSettings_->AutoscalingSettings_) {
132+
if (settings.AlterPartitioningSettings_->AutoscalingSettings_->Strategy_) {
133+
request.mutable_alter_partitioning_settings()->mutable_alter_autoscaling_settings()->set_set_strategy(static_cast<Ydb::Topic::AutoscalingStrategy>(*settings.AlterPartitioningSettings_->AutoscalingSettings_->Strategy_));
134+
}
135+
if (settings.AlterPartitioningSettings_->AutoscalingSettings_->ScaleDownThresholdPercent_) {
136+
request.mutable_alter_partitioning_settings()->mutable_alter_autoscaling_settings()->mutable_set_partition_write_speed()->set_set_scale_down_threshold_percent(*settings.AlterPartitioningSettings_->AutoscalingSettings_->ScaleDownThresholdPercent_);
137+
}
138+
if (settings.AlterPartitioningSettings_->AutoscalingSettings_->ScaleUpThresholdPercent_) {
139+
request.mutable_alter_partitioning_settings()->mutable_alter_autoscaling_settings()->mutable_set_partition_write_speed()->set_set_scale_up_threshold_percent(*settings.AlterPartitioningSettings_->AutoscalingSettings_->ScaleUpThresholdPercent_);
140+
}
141+
if (settings.AlterPartitioningSettings_->AutoscalingSettings_->ThresholdTime_) {
142+
request.mutable_alter_partitioning_settings()->mutable_alter_autoscaling_settings()->mutable_set_partition_write_speed()->mutable_set_threshold_time()->set_seconds(settings.AlterPartitioningSettings_->AutoscalingSettings_->ThresholdTime_->Seconds());
143+
}
144+
}
122145
}
123146
if (settings.SetRetentionPeriod_) {
124147
request.mutable_set_retention_period()->set_seconds(settings.SetRetentionPeriod_->Seconds());

0 commit comments

Comments
 (0)