Skip to content

Commit d07e5ec

Browse files
committed
Support all rate limiter's properties in cpp sdk (#14405)
1 parent 2706831 commit d07e5ec

File tree

2 files changed

+304
-3
lines changed

2 files changed

+304
-3
lines changed

include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h

Lines changed: 142 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,52 @@
22

33
#include <ydb-cpp-sdk/client/driver/driver.h>
44

5+
#include <chrono>
6+
#include <unordered_map>
7+
#include <variant>
8+
59
namespace Ydb::RateLimiter {
6-
class CreateResourceRequest;
7-
class DescribeResourceResult;
8-
class HierarchicalDrrSettings;
10+
class CreateResourceRequest;
11+
class DescribeResourceResult;
12+
class HierarchicalDrrSettings;
13+
class ReplicatedBucketSettings;
14+
class MeteringConfig;
15+
class MeteringConfig_Metric;
916
} // namespace Ydb::RateLimiter
1017

1118
namespace NYdb::inline V3::NRateLimiter {
1219

20+
struct TReplicatedBucketSettings {
21+
using TSelf = TReplicatedBucketSettings;
22+
23+
TReplicatedBucketSettings() = default;
24+
TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings&);
25+
26+
void SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings&) const;
27+
28+
// Interval between syncs from kesus and between consumption reports.
29+
// Default value equals 5000 ms and not inherited.
30+
FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportInterval);
31+
};
32+
33+
class TLeafBehavior {
34+
public:
35+
enum EBehavior {
36+
REPLICATED_BUCKET,
37+
};
38+
39+
EBehavior GetBehavior() const;
40+
41+
TLeafBehavior(const TReplicatedBucketSettings&);
42+
TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings&);
43+
const TReplicatedBucketSettings& GetReplicatedBucket() const;
44+
45+
void SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings&) const;
46+
47+
private:
48+
std::variant<TReplicatedBucketSettings> BehaviorSettings_;
49+
};
50+
1351
// Settings for hierarchical deficit round robin (HDRR) algorithm.
1452
template <class TDerived>
1553
struct THierarchicalDrrSettings {
@@ -46,6 +84,91 @@ struct THierarchicalDrrSettings {
4684
// Default value is inherited from parent or 0.75 for root.
4785
// Must be nonnegative and less than or equal to 1.
4886
FLUENT_SETTING_OPTIONAL(double, PrefetchWatermark);
87+
88+
// Prevents bucket from going too deep in negative values. If somebody reports value that will exceed
89+
// this limit the final amount in bucket will be equal to this limit.
90+
// Should be negative value.
91+
// Unset means no limit.
92+
FLUENT_SETTING_OPTIONAL(double, ImmediatelyFillUpTo);
93+
94+
// Behavior of leafs in tree.
95+
// Not inherited.
96+
FLUENT_SETTING_OPTIONAL(TLeafBehavior, LeafBehavior);
97+
};
98+
99+
struct TMetric {
100+
using TSelf = TMetric;
101+
using TLabels = std::unordered_map<std::string, std::string>;
102+
103+
TMetric() = default;
104+
TMetric(const Ydb::RateLimiter::MeteringConfig_Metric&);
105+
106+
void SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric&) const;
107+
108+
// Send this metric to billing.
109+
// Default value is false (not inherited).
110+
FLUENT_SETTING_DEFAULT(bool, Enabled, false);
111+
112+
// Billing metric period (aligned to hour boundary).
113+
// Default value is inherited from parent or equals 60 seconds for root.
114+
FLUENT_SETTING_OPTIONAL(std::chrono::seconds, BillingPeriod);
115+
116+
// User-defined labels.
117+
FLUENT_SETTING(TLabels, Labels);
118+
119+
// Billing metric JSON fields (inherited from parent if not set)
120+
FLUENT_SETTING(std::string, MetricFieldsJson);
121+
};
122+
123+
struct TMeteringConfig {
124+
using TSelf = TMeteringConfig;
125+
126+
TMeteringConfig() = default;
127+
TMeteringConfig(const Ydb::RateLimiter::MeteringConfig&);
128+
129+
void SerializeTo(Ydb::RateLimiter::MeteringConfig&) const;
130+
131+
// Meter consumed resources and send billing metrics.
132+
FLUENT_SETTING_DEFAULT(bool, Enabled, false);
133+
134+
// Period to report consumption history from clients to kesus
135+
// Default value is inherited from parent or equals 5000 ms for root.
136+
FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportPeriod);
137+
138+
// Consumption history period that is sent in one message to metering actor.
139+
// Default value is inherited from parent or equals 1000 ms for root.
140+
FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, MeterPeriod);
141+
142+
// Time window to collect data from every client.
143+
// Any client metering message that is `collect_period` late is discarded (not metered or billed).
144+
// Default value is inherited from parent or equals 30 seconds for root.
145+
FLUENT_SETTING_OPTIONAL(std::chrono::seconds, CollectPeriod);
146+
147+
// Provisioned consumption limit in units per second.
148+
// Effective value is limited by corresponding `max_units_per_second`.
149+
// Default value is 0 (not inherited).
150+
FLUENT_SETTING_OPTIONAL(double, ProvisionedUnitsPerSecond);
151+
152+
// Provisioned allowed burst equals `provisioned_coefficient * provisioned_units_per_second` units.
153+
// Effective value is limited by corresponding PrefetchCoefficient.
154+
// Default value is inherited from parent or equals 60 for root.
155+
FLUENT_SETTING_OPTIONAL(double, ProvisionedCoefficient);
156+
157+
// On-demand allowed burst equals `overshoot_coefficient * prefetch_coefficient * max_units_per_second` units.
158+
// Should be greater or equal to 1.0
159+
// Default value is inherited from parent or equals 1.1 for root
160+
FLUENT_SETTING_OPTIONAL(double, OvershootCoefficient);
161+
162+
// Consumption within provisioned limit.
163+
// Informative metric that should be sent to billing (not billed).
164+
FLUENT_SETTING_OPTIONAL(TMetric, Provisioned);
165+
166+
// Consumption that exceeds provisioned limit is billed as on-demand.
167+
FLUENT_SETTING_OPTIONAL(TMetric, OnDemand);
168+
169+
// Consumption that exceeds even on-demand limit.
170+
// Normally it is free and should not be billed.
171+
FLUENT_SETTING_OPTIONAL(TMetric, Overshoot);
49172
};
50173

51174
// Settings for create resource request.
@@ -55,13 +178,16 @@ struct TCreateResourceSettings
55178
{
56179
TCreateResourceSettings() = default;
57180
TCreateResourceSettings(const Ydb::RateLimiter::CreateResourceRequest&);
181+
182+
FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig);
58183
};
59184

60185
// Settings for alter resource request.
61186
struct TAlterResourceSettings
62187
: public TOperationRequestSettings<TAlterResourceSettings>
63188
, public THierarchicalDrrSettings<TAlterResourceSettings>
64189
{
190+
FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig);
65191
};
66192

67193
// Settings for drop resource request.
@@ -128,6 +254,14 @@ struct TDescribeResourceResult : public TStatus {
128254
std::optional<double> GetPrefetchWatermark() const {
129255
return PrefetchWatermark_;
130256
}
257+
258+
std::optional<double> GetImmediatelyFillUpTo() const {
259+
return ImmediatelyFillUpTo_;
260+
}
261+
262+
const std::optional<TLeafBehavior>& GetLeafBehavior() const {
263+
return LeafBehavior_;
264+
}
131265
};
132266

133267
TDescribeResourceResult(TStatus status, const Ydb::RateLimiter::DescribeResourceResult& result);
@@ -141,9 +275,14 @@ struct TDescribeResourceResult : public TStatus {
141275
return HierarchicalDrrProps_;
142276
}
143277

278+
const TMeteringConfig& GetMeteringConfig() const {
279+
return MeteringConfig_;
280+
}
281+
144282
private:
145283
std::string ResourcePath_;
146284
THierarchicalDrrProps HierarchicalDrrProps_;
285+
TMeteringConfig MeteringConfig_;
147286
};
148287

149288
using TAsyncDescribeResourceResult = NThreading::TFuture<TDescribeResourceResult>;

src/client/rate_limiter/rate_limiter.cpp

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,47 @@
77
#include <src/api/grpc/ydb_rate_limiter_v1.grpc.pb.h>
88
#include <src/client/common_client/impl/client.h>
99

10+
#include <google/protobuf/util/json_util.h>
11+
1012
namespace NYdb::inline V3::NRateLimiter {
1113

14+
TReplicatedBucketSettings::TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings& proto) {
15+
if (proto.has_report_interval_ms()) {
16+
ReportInterval_ = std::chrono::milliseconds(proto.report_interval_ms());
17+
}
18+
}
19+
20+
void TReplicatedBucketSettings::SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings& proto) const {
21+
if (ReportInterval_) {
22+
proto.set_report_interval_ms(ReportInterval_->count());
23+
}
24+
}
25+
26+
TLeafBehavior::EBehavior TLeafBehavior::GetBehavior() const {
27+
return static_cast<EBehavior>(BehaviorSettings_.index());
28+
}
29+
30+
TLeafBehavior::TLeafBehavior(const TReplicatedBucketSettings& replicatedBucket)
31+
: BehaviorSettings_(replicatedBucket)
32+
{
33+
}
34+
35+
TLeafBehavior::TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings& replicatedBucket)
36+
: BehaviorSettings_(replicatedBucket)
37+
{
38+
}
39+
40+
const TReplicatedBucketSettings& TLeafBehavior::GetReplicatedBucket() const {
41+
return std::get<TReplicatedBucketSettings>(BehaviorSettings_);
42+
}
43+
44+
void TLeafBehavior::SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings& proto) const {
45+
switch (GetBehavior()) {
46+
case REPLICATED_BUCKET:
47+
return GetReplicatedBucket().SerializeTo(*proto.mutable_replicated_bucket());
48+
}
49+
}
50+
1251
template <class TDerived>
1352
THierarchicalDrrSettings<TDerived>::THierarchicalDrrSettings(const Ydb::RateLimiter::HierarchicalDrrSettings& proto) {
1453
if (proto.max_units_per_second()) {
@@ -26,6 +65,18 @@ THierarchicalDrrSettings<TDerived>::THierarchicalDrrSettings(const Ydb::RateLimi
2665
if (proto.prefetch_watermark()) {
2766
PrefetchWatermark_ = proto.prefetch_watermark();
2867
}
68+
69+
if (proto.has_immediately_fill_up_to()) {
70+
ImmediatelyFillUpTo_ = proto.immediately_fill_up_to();
71+
}
72+
73+
switch (proto.leaf_behavior_case()) {
74+
case Ydb::RateLimiter::HierarchicalDrrSettings::kReplicatedBucket:
75+
LeafBehavior_.emplace(proto.replicated_bucket());
76+
break;
77+
case Ydb::RateLimiter::HierarchicalDrrSettings::LEAF_BEHAVIOR_NOT_SET:
78+
break;
79+
}
2980
}
3081

3182
template <class TDerived>
@@ -45,6 +96,105 @@ void THierarchicalDrrSettings<TDerived>::SerializeTo(Ydb::RateLimiter::Hierarchi
4596
if (PrefetchWatermark_) {
4697
proto.set_prefetch_watermark(*PrefetchWatermark_);
4798
}
99+
100+
if (ImmediatelyFillUpTo_) {
101+
proto.set_immediately_fill_up_to(*ImmediatelyFillUpTo_);
102+
}
103+
104+
if (LeafBehavior_) {
105+
LeafBehavior_->SerializeTo(proto);
106+
}
107+
}
108+
109+
TMetric::TMetric(const Ydb::RateLimiter::MeteringConfig_Metric& proto) {
110+
Enabled_ = proto.enabled();
111+
if (proto.billing_period_sec()) {
112+
BillingPeriod_ = std::chrono::seconds(proto.billing_period_sec());
113+
}
114+
for (const auto& [k, v] : proto.labels()) {
115+
Labels_[k] = v;
116+
}
117+
if (proto.has_metric_fields()) {
118+
TString jsonStr;
119+
if (auto st = google::protobuf::util::MessageToJsonString(proto.metric_fields(), &jsonStr); st.ok()) {
120+
MetricFieldsJson_ = jsonStr;
121+
}
122+
}
123+
}
124+
125+
void TMetric::SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric& proto) const {
126+
proto.set_enabled(Enabled_);
127+
if (BillingPeriod_) {
128+
proto.set_billing_period_sec(BillingPeriod_->count());
129+
}
130+
for (const auto& [k, v] : Labels_) {
131+
(*proto.mutable_labels())[k] = v;
132+
}
133+
if (!MetricFieldsJson_.empty()) {
134+
google::protobuf::util::JsonStringToMessage(MetricFieldsJson_, proto.mutable_metric_fields());
135+
}
136+
}
137+
138+
TMeteringConfig::TMeteringConfig(const Ydb::RateLimiter::MeteringConfig& proto) {
139+
Enabled_ = proto.enabled();
140+
if (proto.report_period_ms()) {
141+
ReportPeriod_ = std::chrono::milliseconds(proto.report_period_ms());
142+
}
143+
if (proto.meter_period_ms()) {
144+
MeterPeriod_ = std::chrono::milliseconds(proto.meter_period_ms());
145+
}
146+
if (proto.collect_period_sec()) {
147+
CollectPeriod_ = std::chrono::seconds(proto.collect_period_sec());
148+
}
149+
if (proto.provisioned_units_per_second()) {
150+
ProvisionedUnitsPerSecond_ = proto.provisioned_units_per_second();
151+
}
152+
if (proto.provisioned_coefficient()) {
153+
ProvisionedCoefficient_ = proto.provisioned_coefficient();
154+
}
155+
if (proto.overshoot_coefficient()) {
156+
OvershootCoefficient_ = proto.overshoot_coefficient();
157+
}
158+
if (proto.has_provisioned()) {
159+
Provisioned_.emplace(proto.provisioned());
160+
}
161+
if (proto.has_on_demand()) {
162+
OnDemand_.emplace(proto.on_demand());
163+
}
164+
if (proto.has_overshoot()) {
165+
Overshoot_.emplace(proto.overshoot());
166+
}
167+
}
168+
169+
void TMeteringConfig::SerializeTo(Ydb::RateLimiter::MeteringConfig& proto) const {
170+
proto.set_enabled(Enabled_);
171+
if (ReportPeriod_) {
172+
proto.set_report_period_ms(ReportPeriod_->count());
173+
}
174+
if (MeterPeriod_) {
175+
proto.set_meter_period_ms(MeterPeriod_->count());
176+
}
177+
if (CollectPeriod_) {
178+
proto.set_collect_period_sec(CollectPeriod_->count());
179+
}
180+
if (ProvisionedUnitsPerSecond_) {
181+
proto.set_provisioned_units_per_second(*ProvisionedUnitsPerSecond_);
182+
}
183+
if (ProvisionedCoefficient_) {
184+
proto.set_provisioned_coefficient(*ProvisionedCoefficient_);
185+
}
186+
if (OvershootCoefficient_) {
187+
proto.set_overshoot_coefficient(*OvershootCoefficient_);
188+
}
189+
if (Provisioned_) {
190+
Provisioned_->SerializeTo(*proto.mutable_provisioned());
191+
}
192+
if (OnDemand_) {
193+
OnDemand_->SerializeTo(*proto.mutable_on_demand());
194+
}
195+
if (Overshoot_) {
196+
Overshoot_->SerializeTo(*proto.mutable_overshoot());
197+
}
48198
}
49199

50200
template struct THierarchicalDrrSettings<TCreateResourceSettings>;
@@ -67,6 +217,9 @@ TDescribeResourceResult::TDescribeResourceResult(TStatus status, const Ydb::Rate
67217
, ResourcePath_(result.resource().resource_path())
68218
, HierarchicalDrrProps_(result.resource().hierarchical_drr())
69219
{
220+
if (result.resource().has_metering_config()) {
221+
MeteringConfig_ = result.resource().metering_config();
222+
}
70223
}
71224

72225
TDescribeResourceResult::THierarchicalDrrProps::THierarchicalDrrProps(const Ydb::RateLimiter::HierarchicalDrrSettings& settings)
@@ -102,6 +255,15 @@ class TRateLimiterClient::TImpl : public TClientImplCommon<TRateLimiterClient::T
102255
if (settings.PrefetchWatermark_) {
103256
hdrr.set_prefetch_watermark(*settings.PrefetchWatermark_);
104257
}
258+
if (settings.ImmediatelyFillUpTo_) {
259+
hdrr.set_immediately_fill_up_to(*settings.ImmediatelyFillUpTo_);
260+
}
261+
if (settings.LeafBehavior_) {
262+
settings.LeafBehavior_->SerializeTo(hdrr);
263+
}
264+
if (settings.MeteringConfig_) {
265+
settings.MeteringConfig_->SerializeTo(*resource.mutable_metering_config());
266+
}
105267

106268
return request;
107269
}

0 commit comments

Comments
 (0)