Skip to content

Commit 6ba38eb

Browse files
committed
Moved commit "autopartitioning for kinesis" from ydb repo
1 parent f4b2e42 commit 6ba38eb

File tree

2 files changed

+190
-2
lines changed

2 files changed

+190
-2
lines changed

include/ydb-cpp-sdk/client/datastreams/datastreams.h

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,157 @@ namespace NYdb::NDataStreams::V1 {
101101
std::string ExplicitHashDecimal;
102102
};
103103

104+
enum class EAutoPartitioningStrategy: uint32_t {
105+
Unspecified = 0,
106+
Disabled = 1,
107+
ScaleUp = 2,
108+
ScaleUpAndDown = 3,
109+
};
110+
111+
struct TCreateStreamSettings;
112+
struct TUpdateStreamSettings;
113+
114+
115+
template<typename TSettings>
116+
struct TPartitioningSettingsBuilder;
117+
template<typename TSettings>
118+
struct TAutoPartitioningSettingsBuilder;
119+
120+
struct TAutoPartitioningSettings {
121+
friend struct TAutoPartitioningSettingsBuilder<TCreateStreamSettings>;
122+
friend struct TAutoPartitioningSettingsBuilder<TUpdateStreamSettings>;
123+
public:
124+
TAutoPartitioningSettings()
125+
: Strategy_(EAutoPartitioningStrategy::Disabled)
126+
, StabilizationWindow_(TDuration::Seconds(0))
127+
, DownUtilizationPercent_(0)
128+
, UpUtilizationPercent_(0) {
129+
}
130+
TAutoPartitioningSettings(const Ydb::DataStreams::V1::AutoPartitioningSettings& settings);
131+
TAutoPartitioningSettings(EAutoPartitioningStrategy strategy, TDuration stabilizationWindow, uint64_t downUtilizationPercent, uint64_t upUtilizationPercent)
132+
: Strategy_(strategy)
133+
, StabilizationWindow_(stabilizationWindow)
134+
, DownUtilizationPercent_(downUtilizationPercent)
135+
, UpUtilizationPercent_(upUtilizationPercent) {}
136+
137+
EAutoPartitioningStrategy GetStrategy() const { return Strategy_; };
138+
TDuration GetStabilizationWindow() const { return StabilizationWindow_; };
139+
uint32_t GetDownUtilizationPercent() const { return DownUtilizationPercent_; };
140+
uint32_t GetUpUtilizationPercent() const { return UpUtilizationPercent_; };
141+
private:
142+
EAutoPartitioningStrategy Strategy_;
143+
TDuration StabilizationWindow_;
144+
uint32_t DownUtilizationPercent_;
145+
uint32_t UpUtilizationPercent_;
146+
};
147+
148+
149+
class TPartitioningSettings {
150+
using TSelf = TPartitioningSettings;
151+
friend struct TPartitioningSettingsBuilder<TCreateStreamSettings>;
152+
friend struct TPartitioningSettingsBuilder<TUpdateStreamSettings>;
153+
public:
154+
TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), AutoPartitioningSettings_(){}
155+
TPartitioningSettings(const Ydb::DataStreams::V1::PartitioningSettings& settings);
156+
TPartitioningSettings(uint64_t minActivePartitions, uint64_t maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {})
157+
: MinActivePartitions_(minActivePartitions)
158+
, MaxActivePartitions_(maxActivePartitions)
159+
, AutoPartitioningSettings_(autoscalingSettings) {
160+
}
161+
162+
uint64_t GetMinActivePartitions() const { return MinActivePartitions_; };
163+
uint64_t GetMaxActivePartitions() const { return MaxActivePartitions_; };
164+
TAutoPartitioningSettings GetAutoPartitioningSettings() const { return AutoPartitioningSettings_; };
165+
private:
166+
uint64_t MinActivePartitions_;
167+
uint64_t MaxActivePartitions_;
168+
TAutoPartitioningSettings AutoPartitioningSettings_;
169+
};
170+
104171
struct TCreateStreamSettings : public NYdb::TOperationRequestSettings<TCreateStreamSettings> {
105172
FLUENT_SETTING(uint32_t, ShardCount);
106173
FLUENT_SETTING_OPTIONAL(uint32_t, RetentionPeriodHours);
107174
FLUENT_SETTING_OPTIONAL(uint32_t, RetentionStorageMegabytes);
108175
FLUENT_SETTING(uint64_t, WriteQuotaKbPerSec);
109176
FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode);
177+
178+
179+
FLUENT_SETTING_OPTIONAL(TPartitioningSettings, PartitioningSettings);
180+
TPartitioningSettingsBuilder<TCreateStreamSettings> BeginConfigurePartitioningSettings();
110181
};
182+
183+
template<typename TSettings>
184+
struct TAutoPartitioningSettingsBuilder {
185+
using TSelf = TAutoPartitioningSettingsBuilder<TSettings>;
186+
public:
187+
TAutoPartitioningSettingsBuilder(TPartitioningSettingsBuilder<TSettings>& parent, TAutoPartitioningSettings& settings): Parent_(parent), Settings_(settings) {}
188+
189+
TSelf Strategy(EAutoPartitioningStrategy value) {
190+
Settings_.Strategy_ = value;
191+
return *this;
192+
}
193+
194+
TSelf StabilizationWindow(TDuration value) {
195+
Settings_.StabilizationWindow_ = value;
196+
return *this;
197+
}
198+
199+
TSelf DownUtilizationPercent(uint32_t value) {
200+
Settings_.DownUtilizationPercent_ = value;
201+
return *this;
202+
}
203+
204+
TSelf UpUtilizationPercent(uint32_t value) {
205+
Settings_.UpUtilizationPercent_ = value;
206+
return *this;
207+
}
208+
209+
TPartitioningSettingsBuilder<TSettings>& EndConfigureAutoPartitioningSettings() {
210+
return Parent_;
211+
}
212+
213+
private:
214+
TPartitioningSettingsBuilder<TSettings>& Parent_;
215+
TAutoPartitioningSettings& Settings_;
216+
};
217+
218+
template<typename TSettings>
219+
struct TPartitioningSettingsBuilder {
220+
using TSelf = TPartitioningSettingsBuilder;
221+
public:
222+
TPartitioningSettingsBuilder(TSettings& parent): Parent_(parent) {}
223+
224+
TSelf MinActivePartitions(uint64_t value) {
225+
if (!Parent_.PartitioningSettings_.has_value()) {
226+
Parent_.PartitioningSettings_.emplace();
227+
}
228+
(*Parent_.PartitioningSettings_).MinActivePartitions_ = value;
229+
return *this;
230+
}
231+
232+
TSelf MaxActivePartitions(uint64_t value) {
233+
if (!Parent_.PartitioningSettings_.has_value()) {
234+
Parent_.PartitioningSettings_.emplace();
235+
}
236+
(*Parent_.PartitioningSettings_).MaxActivePartitions_ = value;
237+
return *this;
238+
}
239+
240+
TAutoPartitioningSettingsBuilder<TSettings> BeginConfigureAutoPartitioningSettings() {
241+
if (!Parent_.PartitioningSettings_.has_value()) {
242+
Parent_.PartitioningSettings_.emplace();
243+
}
244+
return {*this, (*Parent_.PartitioningSettings_).AutoPartitioningSettings_};
245+
}
246+
247+
TSettings& EndConfigurePartitioningSettings() {
248+
return Parent_;
249+
}
250+
251+
private:
252+
TSettings& Parent_;
253+
};
254+
111255
struct TListStreamsSettings : public NYdb::TOperationRequestSettings<TListStreamsSettings> {
112256
FLUENT_SETTING(uint32_t, Limit);
113257
FLUENT_SETTING(std::string, ExclusiveStartStreamName);
@@ -155,6 +299,8 @@ namespace NYdb::NDataStreams::V1 {
155299
FLUENT_SETTING(uint64_t, WriteQuotaKbPerSec);
156300
FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode);
157301

302+
FLUENT_SETTING_OPTIONAL(TPartitioningSettings, PartitioningSettings);
303+
TPartitioningSettingsBuilder<TUpdateStreamSettings> BeginConfigurePartitioningSettings();
158304
};
159305
struct TPutRecordSettings : public NYdb::TOperationRequestSettings<TPutRecordSettings> {};
160306
struct TPutRecordsSettings : public NYdb::TOperationRequestSettings<TPutRecordsSettings> {};

src/client/datastreams/datastreams.cpp

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,47 @@
55
#undef INCLUDE_YDB_INTERNAL_H
66

77
#include <ydb-cpp-sdk/library/yql_common/issue/yql_issue.h>
8-
//#include <src/library/yql_common/issue/yql_issue_message.h>
8+
#include <src/library/yql_common/issue/yql_issue_message.h>
99

1010
#include <src/client/common_client/impl/client.h>
1111

1212
namespace NYdb::NDataStreams::V1 {
1313

14+
TPartitioningSettingsBuilder<TCreateStreamSettings> TCreateStreamSettings::BeginConfigurePartitioningSettings() {
15+
return { *this };
16+
}
17+
18+
TPartitioningSettingsBuilder<TUpdateStreamSettings> TUpdateStreamSettings::BeginConfigurePartitioningSettings() {
19+
return { *this };
20+
}
21+
22+
void SetPartitionSettings(const TPartitioningSettings& ps, ::Ydb::DataStreams::V1::PartitioningSettings* pt) {
23+
pt->set_max_active_partitions(ps.GetMaxActivePartitions());
24+
pt->set_min_active_partitions(ps.GetMinActivePartitions());
25+
26+
::Ydb::DataStreams::V1::AutoPartitioningStrategy strategy;
27+
switch (ps.GetAutoPartitioningSettings().GetStrategy()) {
28+
case EAutoPartitioningStrategy::Unspecified:
29+
case EAutoPartitioningStrategy::Disabled:
30+
strategy = ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED;
31+
break;
32+
case EAutoPartitioningStrategy::ScaleUp:
33+
strategy = ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP;
34+
break;
35+
case EAutoPartitioningStrategy::ScaleUpAndDown:
36+
strategy = ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN;
37+
break;
38+
}
39+
40+
pt->mutable_auto_partitioning_settings()->set_strategy(strategy);
41+
pt->mutable_auto_partitioning_settings()->mutable_partition_write_speed()
42+
->mutable_stabilization_window()->set_seconds(ps.GetAutoPartitioningSettings().GetStabilizationWindow().Seconds());
43+
pt->mutable_auto_partitioning_settings()->mutable_partition_write_speed()
44+
->set_up_utilization_percent(ps.GetAutoPartitioningSettings().GetUpUtilizationPercent());
45+
pt->mutable_auto_partitioning_settings()->mutable_partition_write_speed()
46+
->set_down_utilization_percent(ps.GetAutoPartitioningSettings().GetDownUtilizationPercent());
47+
}
48+
1449
class TDataStreamsClient::TImpl : public TClientImplCommon<TDataStreamsClient::TImpl> {
1550
public:
1651
TImpl(std::shared_ptr <TGRpcConnectionsImpl> &&connections, const TCommonClientSettings &settings)
@@ -88,6 +123,10 @@ namespace NYdb::NDataStreams::V1 {
88123
*settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED
89124
: Ydb::DataStreams::V1::StreamMode::ON_DEMAND);
90125
}
126+
127+
if (settings.PartitioningSettings_.has_value()) {
128+
SetPartitionSettings(*settings.PartitioningSettings_, req.mutable_partitioning_settings());
129+
}
91130
});
92131
}
93132

@@ -372,6 +411,10 @@ namespace NYdb::NDataStreams::V1 {
372411
*settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED
373412
: Ydb::DataStreams::V1::StreamMode::ON_DEMAND);
374413
}
414+
415+
if (settings.PartitioningSettings_.has_value()) {
416+
SetPartitionSettings(*settings.PartitioningSettings_, req.mutable_partitioning_settings());
417+
}
375418
});
376419
}
377420

@@ -907,4 +950,3 @@ namespace NYdb::NDataStreams::V1 {
907950
TProtoRequestSettings settings
908951
);
909952
}
910-

0 commit comments

Comments
 (0)