Skip to content

Commit 512da08

Browse files
jepett0Gazizonoki
authored andcommitted
topic: preliminary refactoring in the restore tool code (#14083)
1 parent a926349 commit 512da08

File tree

3 files changed

+136
-46
lines changed

3 files changed

+136
-46
lines changed

include/ydb-cpp-sdk/client/topic/control_plane.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ namespace NYdb::inline V3 {
1919
}
2020

2121
namespace NYdb::inline V3::NTopic {
22-
22+
2323
enum class EMeteringMode : uint32_t {
2424
Unspecified = 0,
2525
ReservedCapacity = 1,
@@ -187,6 +187,8 @@ friend struct TAutoPartitioningSettingsBuilder;
187187
, DownUtilizationPercent_(downUtilizationPercent)
188188
, UpUtilizationPercent_(upUtilizationPercent) {}
189189

190+
void SerializeTo(Ydb::Topic::AutoPartitioningSettings& proto) const;
191+
190192
EAutoPartitioningStrategy GetStrategy() const;
191193
TDuration GetStabilizationWindow() const;
192194
ui32 GetDownUtilizationPercent() const;
@@ -228,6 +230,8 @@ class TPartitioningSettings {
228230
{
229231
}
230232

233+
void SerializeTo(Ydb::Topic::PartitioningSettings& proto) const;
234+
231235
uint64_t GetMinActivePartitions() const;
232236
uint64_t GetMaxActivePartitions() const;
233237
uint64_t GetPartitionCountLimit() const;
@@ -437,8 +441,11 @@ struct TConsumerSettings {
437441

438442
using TAttributes = std::map<std::string, std::string>;
439443

440-
TConsumerSettings(TSettings& parent): Parent_(parent) {}
444+
TConsumerSettings(TSettings& parent) : Parent_(parent) {}
441445
TConsumerSettings(TSettings& parent, const std::string& name) : ConsumerName_(name), Parent_(parent) {}
446+
TConsumerSettings(TSettings& parent, const Ydb::Topic::Consumer& proto);
447+
448+
void SerializeTo(Ydb::Topic::Consumer& proto) const;
442449

443450
FLUENT_SETTING(std::string, ConsumerName);
444451
FLUENT_SETTING_DEFAULT(bool, Important, false);
@@ -526,6 +533,11 @@ struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSetti
526533
using TSelf = TCreateTopicSettings;
527534
using TAttributes = std::map<std::string, std::string>;
528535

536+
TCreateTopicSettings() = default;
537+
TCreateTopicSettings(const Ydb::Topic::CreateTopicRequest& proto);
538+
539+
void SerializeTo(Ydb::Topic::CreateTopicRequest& proto) const;
540+
529541
FLUENT_SETTING(TPartitioningSettings, PartitioningSettings);
530542

531543
FLUENT_SETTING_DEFAULT(TDuration, RetentionPeriod, TDuration::Hours(24));

src/client/topic/impl/topic.cpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,13 @@ TPartitioningSettings::TPartitioningSettings(const Ydb::Topic::PartitioningSetti
234234
, AutoPartitioningSettings_(settings.auto_partitioning_settings())
235235
{}
236236

237+
void TPartitioningSettings::SerializeTo(Ydb::Topic::PartitioningSettings& proto) const {
238+
proto.set_min_active_partitions(MinActivePartitions_);
239+
proto.set_max_active_partitions(MaxActivePartitions_);
240+
proto.set_partition_count_limit(PartitionCountLimit_);
241+
AutoPartitioningSettings_.SerializeTo(*proto.mutable_auto_partitioning_settings());
242+
}
243+
237244
uint64_t TPartitioningSettings::GetMinActivePartitions() const {
238245
return MinActivePartitions_;
239246
}
@@ -257,6 +264,14 @@ TAutoPartitioningSettings::TAutoPartitioningSettings(const Ydb::Topic::AutoParti
257264
, UpUtilizationPercent_(settings.partition_write_speed().up_utilization_percent())
258265
{}
259266

267+
void TAutoPartitioningSettings::SerializeTo(Ydb::Topic::AutoPartitioningSettings& proto) const {
268+
proto.set_strategy(static_cast<Ydb::Topic::AutoPartitioningStrategy>(Strategy_));
269+
auto& writeSpeed = *proto.mutable_partition_write_speed();
270+
writeSpeed.mutable_stabilization_window()->set_seconds(StabilizationWindow_.Seconds());
271+
writeSpeed.set_down_utilization_percent(DownUtilizationPercent_);
272+
writeSpeed.set_up_utilization_percent(UpUtilizationPercent_);
273+
}
274+
260275
EAutoPartitioningStrategy TAutoPartitioningSettings::GetStrategy() const {
261276
return Strategy_;
262277
}
@@ -542,4 +557,109 @@ TAsyncStatus TTopicClient::CommitOffset(const std::string& path, uint64_t partit
542557
return Impl_->CommitOffset(path, partitionId, consumerName, offset, settings);
543558
}
544559

560+
namespace {
561+
562+
Ydb::Topic::SupportedCodecs SerializeCodecs(const std::vector<ECodec>& codecs) {
563+
Ydb::Topic::SupportedCodecs proto;
564+
for (ECodec codec : codecs) {
565+
proto.add_codecs(static_cast<Ydb::Topic::Codec>(codec));
566+
}
567+
return proto;
568+
}
569+
570+
std::vector<ECodec> DeserializeCodecs(const Ydb::Topic::SupportedCodecs& proto) {
571+
std::vector<ECodec> codecs;
572+
codecs.reserve(proto.codecs_size());
573+
for (int codec : proto.codecs()) {
574+
codecs.emplace_back(static_cast<ECodec>(codec));
575+
}
576+
return codecs;
577+
}
578+
579+
google::protobuf::Map<TProtoStringType, TProtoStringType> SerializeAttributes(const std::map<std::string, std::string>& attributes) {
580+
google::protobuf::Map<TProtoStringType, TProtoStringType> proto;
581+
for (const auto& [key, value] : attributes) {
582+
proto.emplace(key, value);
583+
}
584+
return proto;
585+
}
586+
587+
std::map<std::string, std::string> DeserializeAttributes(const google::protobuf::Map<TProtoStringType, TProtoStringType>& proto) {
588+
std::map<std::string, std::string> attributes;
589+
for (const auto& [key, value] : proto) {
590+
attributes.emplace(key, value);
591+
}
592+
return attributes;
593+
}
594+
595+
template <typename TSettings>
596+
google::protobuf::RepeatedPtrField<Ydb::Topic::Consumer> SerializeConsumers(const std::vector<TConsumerSettings<TSettings>>& consumers) {
597+
google::protobuf::RepeatedPtrField<Ydb::Topic::Consumer> proto;
598+
proto.Reserve(consumers.size());
599+
for (const auto& consumer : consumers) {
600+
consumer.SerializeTo(*proto.Add());
601+
}
602+
return proto;
603+
}
604+
605+
template <typename TSettings>
606+
std::vector<TConsumerSettings<TSettings>> DeserializeConsumers(TSettings& parent, const google::protobuf::RepeatedPtrField<Ydb::Topic::Consumer>& proto) {
607+
std::vector<TConsumerSettings<TSettings>> consumers;
608+
consumers.reserve(proto.size());
609+
for (const auto& consumer : proto) {
610+
consumers.emplace_back(TConsumerSettings<TSettings>(parent, consumer));
611+
}
612+
return consumers;
613+
}
614+
615+
}
616+
617+
template <typename TSettings>
618+
TConsumerSettings<TSettings>::TConsumerSettings(TSettings& parent, const Ydb::Topic::Consumer& proto)
619+
: ConsumerName_(proto.name())
620+
, Important_(proto.important())
621+
, ReadFrom_(TInstant::Seconds(proto.read_from().seconds()))
622+
, SupportedCodecs_(DeserializeCodecs(proto.supported_codecs()))
623+
, Attributes_(DeserializeAttributes(proto.attributes()))
624+
, Parent_(parent)
625+
{
626+
}
627+
628+
template <typename TSettings>
629+
void TConsumerSettings<TSettings>::SerializeTo(Ydb::Topic::Consumer& proto) const {
630+
proto.set_name(ConsumerName_);
631+
proto.set_important(Important_);
632+
proto.mutable_read_from()->set_seconds(ReadFrom_.Seconds());
633+
*proto.mutable_supported_codecs() = SerializeCodecs(SupportedCodecs_);
634+
*proto.mutable_attributes() = SerializeAttributes(Attributes_);
635+
}
636+
637+
template struct TConsumerSettings<TCreateTopicSettings>;
638+
template struct TConsumerSettings<TAlterTopicSettings>;
639+
640+
TCreateTopicSettings::TCreateTopicSettings(const Ydb::Topic::CreateTopicRequest& proto)
641+
: PartitioningSettings_(TPartitioningSettings(proto.partitioning_settings()))
642+
, RetentionPeriod_(TDuration::Seconds(proto.retention_period().seconds()))
643+
, SupportedCodecs_(DeserializeCodecs(proto.supported_codecs()))
644+
, RetentionStorageMb_(proto.retention_storage_mb())
645+
, MeteringMode_(TProtoAccessor::FromProto(proto.metering_mode()))
646+
, PartitionWriteSpeedBytesPerSecond_(proto.partition_write_speed_bytes_per_second())
647+
, PartitionWriteBurstBytes_(proto.partition_write_burst_bytes())
648+
, Attributes_(DeserializeAttributes(proto.attributes()))
649+
{
650+
Consumers_ = DeserializeConsumers(*this, proto.consumers());
651+
}
652+
653+
void TCreateTopicSettings::SerializeTo(Ydb::Topic::CreateTopicRequest& request) const {
654+
PartitioningSettings_.SerializeTo(*request.mutable_partitioning_settings());
655+
request.mutable_retention_period()->set_seconds(RetentionPeriod_.Seconds());
656+
*request.mutable_supported_codecs() = SerializeCodecs(SupportedCodecs_);
657+
request.set_retention_storage_mb(RetentionStorageMb_);
658+
request.set_metering_mode(TProtoAccessor::GetProto(MeteringMode_));
659+
request.set_partition_write_speed_bytes_per_second(PartitionWriteSpeedBytesPerSecond_);
660+
request.set_partition_write_burst_bytes(PartitionWriteBurstBytes_);
661+
*request.mutable_consumers() = SerializeConsumers(Consumers_);
662+
*request.mutable_attributes() = SerializeAttributes(Attributes_);
663+
}
664+
545665
} // namespace NYdb::NTopic

src/client/topic/impl/topic_impl.h

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,6 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
4242
{
4343
}
4444

45-
template <class TSettings>
46-
static void ConvertConsumerToProto(const TConsumerSettings<TSettings>& settings, Ydb::Topic::Consumer& consumerProto) {
47-
consumerProto.set_name(TStringType{settings.ConsumerName_});
48-
consumerProto.set_important(settings.Important_);
49-
consumerProto.mutable_read_from()->set_seconds(settings.ReadFrom_.Seconds());
50-
51-
for (const auto& codec : settings.SupportedCodecs_) {
52-
consumerProto.mutable_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec)));
53-
}
54-
for (auto& pair : settings.Attributes_) {
55-
(*consumerProto.mutable_attributes())[pair.first] = pair.second;
56-
}
57-
}
58-
5945
static void ConvertAlterConsumerToProto(const TAlterConsumerSettings& settings, Ydb::Topic::AlterConsumer& consumerProto) {
6046
consumerProto.set_name(TStringType{settings.ConsumerName_});
6147
if (settings.SetImportant_)
@@ -78,34 +64,7 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
7864
static Ydb::Topic::CreateTopicRequest MakePropsCreateRequest(const std::string& path, const TCreateTopicSettings& settings) {
7965
Ydb::Topic::CreateTopicRequest request = MakeOperationRequest<Ydb::Topic::CreateTopicRequest>(settings);
8066
request.set_path(TStringType{path});
81-
82-
request.mutable_partitioning_settings()->set_min_active_partitions(settings.PartitioningSettings_.GetMinActivePartitions());
83-
request.mutable_partitioning_settings()->set_partition_count_limit(settings.PartitioningSettings_.GetPartitionCountLimit());
84-
request.mutable_partitioning_settings()->set_max_active_partitions(settings.PartitioningSettings_.GetMaxActivePartitions());
85-
request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(static_cast<Ydb::Topic::AutoPartitioningStrategy>(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetStrategy()));
86-
request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetStabilizationWindow().Seconds());
87-
request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetUpUtilizationPercent());
88-
request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetDownUtilizationPercent());
89-
90-
request.mutable_retention_period()->set_seconds(settings.RetentionPeriod_.Seconds());
91-
92-
for (const auto& codec : settings.SupportedCodecs_) {
93-
request.mutable_supported_codecs()->add_codecs((static_cast<Ydb::Topic::Codec>(codec)));
94-
}
95-
request.set_partition_write_speed_bytes_per_second(settings.PartitionWriteSpeedBytesPerSecond_);
96-
request.set_partition_write_burst_bytes(settings.PartitionWriteBurstBytes_);
97-
request.set_retention_storage_mb(settings.RetentionStorageMb_);
98-
request.set_metering_mode(TProtoAccessor::GetProto(settings.MeteringMode_));
99-
100-
for (auto& pair : settings.Attributes_) {
101-
(*request.mutable_attributes())[pair.first] = pair.second;
102-
}
103-
104-
for (const auto& consumer : settings.Consumers_) {
105-
Ydb::Topic::Consumer& consumerProto = *request.add_consumers();
106-
ConvertConsumerToProto(consumer, consumerProto);
107-
}
108-
67+
settings.SerializeTo(request);
10968
return request;
11069
}
11170

@@ -171,8 +130,7 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
171130
}
172131

173132
for (const auto& consumer : settings.AddConsumers_) {
174-
Ydb::Topic::Consumer& consumerProto = *request.add_add_consumers();
175-
ConvertConsumerToProto(consumer, consumerProto);
133+
consumer.SerializeTo(*request.add_add_consumers());
176134
}
177135

178136
for (const auto& consumer : settings.DropConsumers_) {

0 commit comments

Comments
 (0)