Skip to content

Commit 852697f

Browse files
Refactoring FillTopicDescription & FillChangefeedDescription (#13077)
1 parent 6a230d5 commit 852697f

File tree

7 files changed

+279
-203
lines changed

7 files changed

+279
-203
lines changed

ydb/core/ydb_convert/table_description.cpp

Lines changed: 55 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,64 +1136,67 @@ void FillAttributesImpl(TOutProto& out, const TInProto& in) {
11361136
outAttrs[inAttr.GetKey()] = inAttr.GetValue();
11371137
}
11381138
}
1139+
void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
1140+
const NKikimrSchemeOp::TCdcStreamDescription& in) {
1141+
1142+
out.set_name(in.GetName());
1143+
out.set_virtual_timestamps(in.GetVirtualTimestamps());
1144+
out.set_aws_region(in.GetAwsRegion());
1145+
1146+
if (const auto value = in.GetResolvedTimestampsIntervalMs()) {
1147+
out.mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds());
1148+
}
1149+
1150+
switch (in.GetMode()) {
1151+
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly:
1152+
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate:
1153+
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewImage:
1154+
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeOldImage:
1155+
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewAndOldImages:
1156+
out.set_mode(static_cast<Ydb::Table::ChangefeedMode::Mode>(in.GetMode()));
1157+
break;
1158+
default:
1159+
break;
1160+
}
11391161

1140-
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
1141-
const NKikimrSchemeOp::TTableDescription& in) {
1142-
1143-
for (const auto& stream : in.GetCdcStreams()) {
1144-
auto changefeed = out.add_changefeeds();
1145-
1146-
changefeed->set_name(stream.GetName());
1147-
changefeed->set_virtual_timestamps(stream.GetVirtualTimestamps());
1148-
changefeed->set_aws_region(stream.GetAwsRegion());
1149-
1150-
if (const auto value = stream.GetResolvedTimestampsIntervalMs()) {
1151-
changefeed->mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds());
1152-
}
1162+
switch (in.GetFormat()) {
1163+
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson:
1164+
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
1165+
break;
1166+
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson:
1167+
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
1168+
break;
1169+
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson:
1170+
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
1171+
break;
1172+
default:
1173+
break;
1174+
}
11531175

1154-
switch (stream.GetMode()) {
1155-
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly:
1156-
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate:
1157-
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewImage:
1158-
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeOldImage:
1159-
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewAndOldImages:
1160-
changefeed->set_mode(static_cast<Ydb::Table::ChangefeedMode::Mode>(stream.GetMode()));
1161-
break;
1162-
default:
1163-
break;
1164-
}
1176+
switch (in.GetState()) {
1177+
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady:
1178+
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled:
1179+
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan:
1180+
out.set_state(static_cast<Ydb::Table::ChangefeedDescription::State>(in.GetState()));
1181+
break;
1182+
default:
1183+
break;
1184+
}
11651185

1166-
switch (stream.GetFormat()) {
1167-
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson:
1168-
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
1169-
break;
1170-
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson:
1171-
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
1172-
break;
1173-
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson:
1174-
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
1175-
break;
1176-
default:
1177-
break;
1178-
}
1186+
if (in.HasScanProgress()) {
1187+
auto& scanProgress = *out.mutable_initial_scan_progress();
1188+
scanProgress.set_parts_total(in.GetScanProgress().GetShardsTotal());
1189+
scanProgress.set_parts_completed(in.GetScanProgress().GetShardsCompleted());
1190+
}
11791191

1180-
switch (stream.GetState()) {
1181-
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady:
1182-
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled:
1183-
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan:
1184-
changefeed->set_state(static_cast<Ydb::Table::ChangefeedDescription::State>(stream.GetState()));
1185-
break;
1186-
default:
1187-
break;
1188-
}
1192+
FillAttributesImpl(out, in);
11891193

1190-
if (stream.HasScanProgress()) {
1191-
auto& scanProgress = *changefeed->mutable_initial_scan_progress();
1192-
scanProgress.set_parts_total(stream.GetScanProgress().GetShardsTotal());
1193-
scanProgress.set_parts_completed(stream.GetScanProgress().GetShardsCompleted());
1194-
}
1194+
}
1195+
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
1196+
const NKikimrSchemeOp::TTableDescription& in) {
11951197

1196-
FillAttributesImpl(*changefeed, stream);
1198+
for (const auto& stream : in.GetCdcStreams()) {
1199+
FillChangefeedDescription(*out.add_changefeeds(), stream);
11971200
}
11981201
}
11991202

ydb/core/ydb_convert/table_description.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ bool FillIndexDescription(NKikimrSchemeOp::TIndexedTableCreationConfig& out,
8282
const Ydb::Table::CreateTableRequest& in, Ydb::StatusIds::StatusCode& status, TString& error);
8383

8484
// out
85+
void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
86+
const NKikimrSchemeOp::TCdcStreamDescription& in);
8587
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
8688
const NKikimrSchemeOp::TTableDescription& in);
8789
// in
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
#include "topic_description.h"
2+
#include "ydb_convert.h"
3+
4+
#include <ydb/core/base/appdata_fwd.h>
5+
#include <ydb/core/base/feature_flags.h>
6+
#include <ydb/core/persqueue/utils.h>
7+
#include <ydb/core/protos/feature_flags.pb.h>
8+
#include <ydb/core/protos/pqconfig.pb.h>
9+
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
10+
11+
namespace NKikimr {
12+
13+
bool FillConsumer(Ydb::Topic::Consumer& out, const NKikimrPQ::TPQTabletConfig_TConsumer& in,
14+
Ydb::StatusIds_StatusCode& status, TString& error)
15+
{
16+
const NKikimrPQ::TPQConfig pqConfig = AppData()->PQConfig;
17+
auto consumerName = NPersQueue::ConvertOldConsumerName(in.GetName(), pqConfig);
18+
out.set_name(consumerName);
19+
out.mutable_read_from()->set_seconds(in.GetReadFromTimestampsMs() / 1000);
20+
auto version = in.GetVersion();
21+
if (version != 0)
22+
(*out.mutable_attributes())["_version"] = TStringBuilder() << version;
23+
for (const auto &codec : in.GetCodec().GetIds()) {
24+
out.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1));
25+
}
26+
27+
out.set_important(in.GetImportant());
28+
TString serviceType = "";
29+
if (in.HasServiceType()) {
30+
serviceType = in.GetServiceType();
31+
} else {
32+
if (pqConfig.GetDisallowDefaultClientServiceType()) {
33+
error = "service type must be set for all read rules";
34+
status = Ydb::StatusIds::INTERNAL_ERROR;
35+
return false;
36+
}
37+
serviceType = pqConfig.GetDefaultClientServiceType().GetName();
38+
}
39+
(*out.mutable_attributes())["_service_type"] = serviceType;
40+
return true;
41+
}
42+
43+
bool FillTopicDescription(Ydb::Topic::DescribeTopicResult& out, const NKikimrSchemeOp::TPersQueueGroupDescription& inDesc,
44+
const NKikimrSchemeOp::TDirEntry& inDirEntry, const TMaybe<TString>& cdcName,
45+
Ydb::StatusIds_StatusCode& status, TString& error) {
46+
47+
const NKikimrPQ::TPQConfig pqConfig = AppData()->PQConfig;
48+
49+
Ydb::Scheme::Entry *selfEntry = out.mutable_self();
50+
ConvertDirectoryEntry(inDirEntry, selfEntry, true);
51+
if (cdcName) {
52+
selfEntry->set_name(*cdcName);
53+
}
54+
55+
for (auto& sourcePart: inDesc.GetPartitions()) {
56+
auto destPart = out.add_partitions();
57+
destPart->set_partition_id(sourcePart.GetPartitionId());
58+
destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active);
59+
if (sourcePart.HasKeyRange()) {
60+
if (sourcePart.GetKeyRange().HasFromBound()) {
61+
destPart->mutable_key_range()->set_from_bound(sourcePart.GetKeyRange().GetFromBound());
62+
}
63+
if (sourcePart.GetKeyRange().HasToBound()) {
64+
destPart->mutable_key_range()->set_to_bound(sourcePart.GetKeyRange().GetToBound());
65+
}
66+
}
67+
68+
for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) {
69+
destPart->add_child_partition_ids(static_cast<int64_t>(sourcePart.GetChildPartitionIds(i)));
70+
}
71+
72+
for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) {
73+
destPart->add_parent_partition_ids(static_cast<int64_t>(sourcePart.GetParentPartitionIds(i)));
74+
}
75+
}
76+
77+
const auto &config = inDesc.GetPQTabletConfig();
78+
if (AppData()->FeatureFlags.GetEnableTopicSplitMerge() && NPQ::SplitMergeEnabled(config)) {
79+
out.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount());
80+
} else {
81+
out.mutable_partitioning_settings()->set_min_active_partitions(inDesc.GetTotalGroupCount());
82+
}
83+
84+
out.mutable_partitioning_settings()->set_max_active_partitions(config.GetPartitionStrategy().GetMaxPartitionCount());
85+
switch(config.GetPartitionStrategy().GetPartitionStrategyType()) {
86+
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT:
87+
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP);
88+
break;
89+
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE:
90+
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN);
91+
break;
92+
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_PAUSED:
93+
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED);
94+
break;
95+
default:
96+
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
97+
break;
98+
}
99+
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds(config.GetPartitionStrategy().GetScaleThresholdSeconds());
100+
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent());
101+
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent());
102+
103+
if (!config.GetRequireAuthWrite()) {
104+
(*out.mutable_attributes())["_allow_unauthenticated_write"] = "true";
105+
}
106+
107+
if (!config.GetRequireAuthRead()) {
108+
(*out.mutable_attributes())["_allow_unauthenticated_read"] = "true";
109+
}
110+
111+
if (inDesc.GetPartitionPerTablet() != 2) {
112+
(*out.mutable_attributes())["_partitions_per_tablet"] =
113+
TStringBuilder() << inDesc.GetPartitionPerTablet();
114+
}
115+
if (config.HasAbcId()) {
116+
(*out.mutable_attributes())["_abc_id"] = TStringBuilder() << config.GetAbcId();
117+
}
118+
if (config.HasAbcSlug()) {
119+
(*out.mutable_attributes())["_abc_slug"] = config.GetAbcSlug();
120+
}
121+
if (config.HasFederationAccount()) {
122+
(*out.mutable_attributes())["_federation_account"] = config.GetFederationAccount();
123+
}
124+
bool local = config.GetLocalDC();
125+
const auto &partConfig = config.GetPartitionConfig();
126+
i64 msip = partConfig.GetMaxSizeInPartition();
127+
if (partConfig.HasMaxSizeInPartition() && msip != Max<i64>()) {
128+
(*out.mutable_attributes())["_max_partition_storage_size"] = TStringBuilder() << msip;
129+
}
130+
out.mutable_retention_period()->set_seconds(partConfig.GetLifetimeSeconds());
131+
out.set_retention_storage_mb(partConfig.GetStorageLimitBytes() / 1024 / 1024);
132+
(*out.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000);
133+
(*out.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts();
134+
135+
if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
136+
out.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond());
137+
out.set_partition_write_burst_bytes(partConfig.GetBurstSize());
138+
}
139+
140+
if (pqConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
141+
auto readSpeedPerConsumer = partConfig.GetWriteSpeedInBytesPerSecond() * 2;
142+
out.set_partition_total_read_speed_bytes_per_second(readSpeedPerConsumer * pqConfig.GetQuotingConfig().GetMaxParallelConsumersPerPartition());
143+
out.set_partition_consumer_read_speed_bytes_per_second(readSpeedPerConsumer);
144+
}
145+
146+
for (const auto &codec : config.GetCodecs().GetIds()) {
147+
out.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1));
148+
}
149+
150+
if (pqConfig.GetBillingMeteringConfig().GetEnabled()) {
151+
switch (config.GetMeteringMode()) {
152+
case NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY:
153+
out.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY);
154+
break;
155+
case NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS:
156+
out.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS);
157+
break;
158+
default:
159+
break;
160+
}
161+
}
162+
163+
for (const auto& consumer : config.GetConsumers()) {
164+
if (!FillConsumer(*out.add_consumers(), consumer, status, error)) {
165+
return false;
166+
}
167+
}
168+
return true;
169+
}
170+
171+
} // namespace NKikimr
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include <util/generic/fwd.h>
4+
5+
namespace Ydb {
6+
namespace Topic {
7+
class Consumer;
8+
class DescribeTopicResult;
9+
}
10+
class StatusIds;
11+
enum StatusIds_StatusCode : int;
12+
}
13+
14+
namespace NKikimrSchemeOp {
15+
class TPersQueueGroupDescription;
16+
class TDirEntry;
17+
}
18+
19+
namespace NYql {
20+
class TIssue;
21+
}
22+
23+
namespace NKikimrPQ {
24+
class TPQTabletConfig_TConsumer;
25+
class TPQConfig;
26+
}
27+
28+
namespace NKikimr {
29+
30+
bool FillConsumer(Ydb::Topic::Consumer& out, const NKikimrPQ::TPQTabletConfig_TConsumer& in, Ydb::StatusIds_StatusCode& status, TString& error);
31+
bool FillTopicDescription(Ydb::Topic::DescribeTopicResult& out, const NKikimrSchemeOp::TPersQueueGroupDescription& inDesc,
32+
const NKikimrSchemeOp::TDirEntry& inDirEntry, const TMaybe<TString>& cdcName,
33+
Ydb::StatusIds_StatusCode& status, TString& error);
34+
35+
} // namespace NKikimr

ydb/core/ydb_convert/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SRCS(
66
table_settings.cpp
77
table_description.cpp
88
table_profiles.cpp
9+
topic_description.cpp
910
ydb_convert.cpp
1011
tx_proxy_status.cpp
1112
)

0 commit comments

Comments
 (0)