Skip to content

Commit e584d37

Browse files
committed
Moved commit "PQ_V1 SDK auto partitioning support" from ydb repo
1 parent 24f1641 commit e584d37

File tree

4 files changed

+66
-3
lines changed

4 files changed

+66
-3
lines changed

include/ydb-cpp-sdk/client/topic/control_plane.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ struct TAlterTopicSettings;
160160
struct TAutoPartitioningSettings {
161161
friend struct TAutoPartitioningSettingsBuilder;
162162
public:
163-
TAutoPartitioningSettings()
163+
TAutoPartitioningSettings()
164164
: Strategy_(EAutoPartitioningStrategy::Disabled)
165165
, StabilizationWindow_(TDuration::Seconds(0))
166166
, DownUtilizationPercent_(0)

src/client/persqueue_public/impl/persqueue.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,20 @@ TDescribeTopicResult::TDescribeTopicResult(TStatus status, const Ydb::PersQueue:
8585
}
8686

8787
TDescribeTopicResult::TTopicSettings::TTopicSettings(const Ydb::PersQueue::V1::TopicSettings& settings) {
88-
89-
PartitionsCount_ = settings.partitions_count();
9088
RetentionPeriod_ = TDuration::MilliSeconds(settings.retention_period_ms());
9189
SupportedFormat_ = static_cast<EFormat>(settings.supported_format());
9290

91+
if (settings.has_auto_partitioning_settings()) {
92+
PartitionsCount_ = settings.auto_partitioning_settings().min_active_partitions();
93+
MaxPartitionsCount_ = settings.auto_partitioning_settings().max_active_partitions();
94+
StabilizationWindow_ = TDuration::Seconds(settings.auto_partitioning_settings().partition_write_speed().stabilization_window().seconds());
95+
UpUtilizationPercent_ = settings.auto_partitioning_settings().partition_write_speed().up_utilization_percent();
96+
DownUtilizationPercent_ = settings.auto_partitioning_settings().partition_write_speed().down_utilization_percent();
97+
AutoPartitioningStrategy_ = settings.auto_partitioning_settings().strategy();
98+
} else {
99+
PartitionsCount_ = settings.partitions_count();
100+
}
101+
93102
for (const auto& codec : settings.supported_codecs()) {
94103
SupportedCodecs_.push_back(static_cast<ECodec>(codec));
95104
}

src/client/persqueue_public/impl/persqueue_impl.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,34 @@ class TPersQueueClient::TImpl : public TClientImplCommon<TPersQueueClient::TImpl
5555

5656
props.set_partitions_count(settings.PartitionsCount_);
5757

58+
bool autoPartitioningSettingsDefined = false;
59+
if (settings.MaxPartitionsCount_.has_value()) {
60+
props.mutable_auto_partitioning_settings()->set_max_active_partitions(settings.PartitionsCount_);
61+
autoPartitioningSettingsDefined = true;
62+
}
63+
if (settings.AutoPartitioningStrategy_.has_value()) {
64+
props.mutable_auto_partitioning_settings()->set_strategy(*settings.AutoPartitioningStrategy_);
65+
autoPartitioningSettingsDefined = true;
66+
}
67+
if (settings.DownUtilizationPercent_.has_value()) {
68+
props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(*settings.DownUtilizationPercent_);
69+
autoPartitioningSettingsDefined = true;
70+
}
71+
if (settings.UpUtilizationPercent_.has_value()) {
72+
props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(*settings.UpUtilizationPercent_);
73+
autoPartitioningSettingsDefined = true;
74+
}
75+
if (settings.StabilizationWindow_.has_value()) {
76+
props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds((*settings.StabilizationWindow_).Seconds());
77+
autoPartitioningSettingsDefined = true;
78+
}
79+
80+
if (!autoPartitioningSettingsDefined) {
81+
props.set_partitions_count(settings.PartitionsCount_);
82+
} else {
83+
props.mutable_auto_partitioning_settings()->set_min_active_partitions(settings.PartitionsCount_);
84+
}
85+
5886
props.set_retention_period_ms(settings.RetentionPeriod_.MilliSeconds());
5987
props.set_supported_format(static_cast<Ydb::PersQueue::V1::TopicSettings::Format>(settings.SupportedFormat_));
6088
for (const auto& codec : settings.SupportedCodecs_) {

src/client/persqueue_public/include/control_plane.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ struct TDescribeTopicResult : public TStatus {
117117
}
118118
GETTER(std::optional<TRemoteMirrorRule>, RemoteMirrorRule);
119119

120+
GETTER(std::optional<uint64_t>, MaxPartitionsCount);
121+
GETTER(std::optional<TDuration>, StabilizationWindow);
122+
GETTER(std::optional<uint64_t>, UpUtilizationPercent);
123+
GETTER(std::optional<uint64_t>, DownUtilizationPercent);
124+
GETTER(std::optional<Ydb::PersQueue::V1::AutoPartitioningStrategy>, AutoPartitioningStrategy);
125+
120126

121127
#undef GETTER
122128

@@ -138,6 +144,12 @@ struct TDescribeTopicResult : public TStatus {
138144
std::optional<ui32> AbcId_;
139145
std::optional<std::string> AbcSlug_;
140146
std::string FederationAccount_;
147+
148+
std::optional<uint64_t> MaxPartitionsCount_;
149+
std::optional<TDuration> StabilizationWindow_;
150+
std::optional<uint64_t> UpUtilizationPercent_;
151+
std::optional<uint64_t> DownUtilizationPercent_;
152+
std::optional<Ydb::PersQueue::V1::AutoPartitioningStrategy> AutoPartitioningStrategy_;
141153
};
142154

143155
TDescribeTopicResult(TStatus status, const Ydb::PersQueue::V1::DescribeTopicResult& result);
@@ -191,6 +203,7 @@ struct TReadRuleSettings {
191203
// Settings for topic.
192204
template <class TDerived>
193205
struct TTopicSettings : public TOperationRequestSettings<TDerived> {
206+
friend class TPersQueueClient;
194207

195208
struct TRemoteMirrorRuleSettings {
196209
TRemoteMirrorRuleSettings() {}
@@ -266,9 +279,22 @@ struct TTopicSettings : public TOperationRequestSettings<TDerived> {
266279
if (settings.RemoteMirrorRule()) {
267280
RemoteMirrorRule_ = TRemoteMirrorRuleSettings().SetSettings(settings.RemoteMirrorRule().value());
268281
}
282+
283+
MaxPartitionsCount_ = settings.MaxPartitionsCount();
284+
StabilizationWindow_ = settings.StabilizationWindow();
285+
UpUtilizationPercent_ = settings.UpUtilizationPercent();
286+
DownUtilizationPercent_ = settings.DownUtilizationPercent();
287+
AutoPartitioningStrategy_ = settings.AutoPartitioningStrategy();
288+
269289
return static_cast<TDerived&>(*this);
270290
}
271291

292+
private:
293+
std::optional<uint64_t> MaxPartitionsCount_;
294+
std::optional<TDuration> StabilizationWindow_;
295+
std::optional<uint64_t> UpUtilizationPercent_;
296+
std::optional<uint64_t> DownUtilizationPercent_;
297+
std::optional<Ydb::PersQueue::V1::AutoPartitioningStrategy> AutoPartitioningStrategy_;
272298
};
273299

274300

0 commit comments

Comments
 (0)