Skip to content

Commit 3cfb9bb

Browse files
authored
Consumer describe command in ydb cli (#8221)
1 parent 457afaa commit 3cfb9bb

File tree

6 files changed

+243
-49
lines changed

6 files changed

+243
-49
lines changed

ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp

Lines changed: 127 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@
22

33
#include <ydb/public/lib/json_value/ydb_json_value.h>
44
#include <ydb/public/lib/ydb_cli/common/pretty_table.h>
5-
#include <ydb/public/lib/ydb_cli/common/print_utils.h>
65
#include <ydb/public/lib/ydb_cli/common/scheme_printers.h>
7-
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
8-
#include <google/protobuf/port_def.inc>
6+
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
97

108
#include <util/string/join.h>
119

@@ -125,6 +123,90 @@ void PrintAllPermissions(
125123
PrintPermissions(effectivePermissions);
126124
}
127125

126+
int PrintPrettyDescribeConsumerResult(const NYdb::NTopic::TConsumerDescription& description, bool withPartitionsStats) {
127+
// Consumer info
128+
const NYdb::NTopic::TConsumer& consumer = description.GetConsumer();
129+
Cout << "Consumer " << consumer.GetConsumerName() << ": " << Endl;
130+
Cout << "Important: " << (consumer.GetImportant() ? "Yes" : "No") << Endl;
131+
if (const TInstant& readFrom = consumer.GetReadFrom()) {
132+
Cout << "Read from: " << readFrom.ToRfc822StringLocal() << Endl;
133+
} else {
134+
Cout << "Read from: 0" << Endl;
135+
}
136+
Cout << "Supported codecs: " << JoinSeq(", ", consumer.GetSupportedCodecs()) << Endl;
137+
138+
if (const auto& attrs = consumer.GetAttributes(); !attrs.empty()) {
139+
TPrettyTable attrTable({ "Attribute", "Value" }, TPrettyTableConfig().WithoutRowDelimiters());
140+
for (const auto& [k, v] : attrs) {
141+
attrTable.AddRow()
142+
.Column(0, k)
143+
.Column(1, v);
144+
}
145+
Cout << "Attributes:" << Endl << attrTable;
146+
}
147+
148+
// Partitions
149+
TVector<TString> columnNames = {
150+
"#",
151+
"Active",
152+
"ChildIds",
153+
"ParentIds"
154+
};
155+
156+
size_t statsBase = columnNames.size();
157+
if (withPartitionsStats) {
158+
columnNames.insert(columnNames.end(),
159+
{
160+
"Start offset",
161+
"End offset",
162+
"Size",
163+
"Last write time",
164+
"Max write time lag",
165+
"Written size per minute",
166+
"Written size per hour",
167+
"Written size per day",
168+
"Committed offset",
169+
"Last read offset",
170+
"Reader name",
171+
"Read session id"
172+
}
173+
);
174+
}
175+
176+
TPrettyTable partitionsTable(columnNames, TPrettyTableConfig().WithoutRowDelimiters());
177+
for (const NYdb::NTopic::TPartitionInfo& partition : description.GetPartitions()) {
178+
auto& row = partitionsTable.AddRow();
179+
row
180+
.Column(0, partition.GetPartitionId())
181+
.Column(1, partition.GetActive())
182+
.Column(2, JoinSeq(",", partition.GetChildPartitionIds()))
183+
.Column(3, JoinSeq(",", partition.GetParentPartitionIds()));
184+
if (withPartitionsStats) {
185+
if (const auto& maybeStats = partition.GetPartitionStats()) {
186+
row
187+
.Column(statsBase + 0, maybeStats->GetStartOffset())
188+
.Column(statsBase + 1, maybeStats->GetEndOffset())
189+
.Column(statsBase + 2, PrettySize(maybeStats->GetStoreSizeBytes()))
190+
.Column(statsBase + 3, FormatTime(maybeStats->GetLastWriteTime()))
191+
.Column(statsBase + 4, FormatDuration(maybeStats->GetMaxWriteTimeLag()))
192+
.Column(statsBase + 5, PrettySize(maybeStats->GetBytesWrittenPerMinute()))
193+
.Column(statsBase + 6, PrettySize(maybeStats->GetBytesWrittenPerHour()))
194+
.Column(statsBase + 7, PrettySize(maybeStats->GetBytesWrittenPerDay()));
195+
}
196+
197+
if (const auto& maybeStats = partition.GetPartitionConsumerStats()) {
198+
row
199+
.Column(statsBase + 8, maybeStats->GetCommittedOffset())
200+
.Column(statsBase + 9, maybeStats->GetLastReadOffset())
201+
.Column(statsBase + 10, maybeStats->GetReaderName())
202+
.Column(statsBase + 11, maybeStats->GetReadSessionId());
203+
}
204+
}
205+
}
206+
Cout << "Partitions:" << Endl << partitionsTable;
207+
return EXIT_SUCCESS;
208+
}
209+
128210
TCommandDescribe::TCommandDescribe()
129211
: TYdbOperationCommand("describe", std::initializer_list<TString>(), "Show information about object at given object")
130212
{}
@@ -138,14 +220,14 @@ void TCommandDescribe::Config(TConfig& config) {
138220
config.Opts->AddLongOption("partition-boundaries", "[Table] Show partition key boundaries").StoreTrue(&ShowKeyShardBoundaries)
139221
.AddLongName("shard-boundaries");
140222
config.Opts->AddLongOption("stats", "[Table|Topic|Replication] Show table/topic/replication statistics").StoreTrue(&ShowStats);
141-
config.Opts->AddLongOption("partition-stats", "[Table|Topic] Show partition statistics").StoreTrue(&ShowPartitionStats);
223+
config.Opts->AddLongOption("partition-stats", "[Table|Topic|Consumer] Show partition statistics").StoreTrue(&ShowPartitionStats);
142224

143225
AddDeprecatedJsonOption(config, "(Deprecated, will be removed soon. Use --format option instead) [Table] Output in json format");
144226
AddFormats(config, { EOutputFormat::Pretty, EOutputFormat::ProtoJsonBase64 });
145227
config.Opts->MutuallyExclusive("json", "format");
146228

147229
config.SetFreeArgsNum(1);
148-
SetFreeArgTitle(0, "<path>", "Path to an object to describe");
230+
SetFreeArgTitle(0, "<path>", "Path to an object to describe. If object is topic consumer, it must be specified as <topic_path>/<consumer_name>");
149231
}
150232

151233
void TCommandDescribe::Parse(TConfig& config) {
@@ -161,6 +243,9 @@ int TCommandDescribe::Run(TConfig& config) {
161243
Path,
162244
FillSettings(NScheme::TDescribePathSettings())
163245
).GetValueSync();
246+
if (!result.IsSuccess()) {
247+
return TryTopicConsumerDescribeOrFail(driver, result);
248+
}
164249
ThrowOnError(result);
165250
return PrintPathResponse(driver, result);
166251
}
@@ -294,50 +379,6 @@ int TCommandDescribe::PrintTopicResponsePretty(const NYdb::NTopic::TTopicDescrip
294379
return EXIT_SUCCESS;
295380
}
296381

297-
template <typename T>
298-
static int PrintProtoJsonBase64(const T& msg) {
299-
using namespace google::protobuf::util;
300-
301-
TString json;
302-
JsonPrintOptions opts;
303-
opts.preserve_proto_field_names = true;
304-
const auto status = MessageToJsonString(msg, &json, opts);
305-
306-
if (!status.ok()) {
307-
#if PROTOBUF_VERSION >= 4022005
308-
Cerr << "Error occurred while converting proto to json: " << status.message() << Endl;
309-
#else
310-
Cerr << "Error occurred while converting proto to json: " << status.message().ToString() << Endl;
311-
#endif
312-
return EXIT_FAILURE;
313-
}
314-
315-
Cout << json << Endl;
316-
return EXIT_SUCCESS;
317-
}
318-
319-
template <typename T>
320-
using TPrettyPrinter = int(TCommandDescribe::*)(const T&) const;
321-
322-
template <typename T>
323-
static int PrintDescription(TCommandDescribe* self, EOutputFormat format, const T& value, TPrettyPrinter<T> prettyFunc) {
324-
switch (format) {
325-
case EOutputFormat::Default:
326-
case EOutputFormat::Pretty:
327-
return std::invoke(prettyFunc, self, value);
328-
case EOutputFormat::Json:
329-
Cerr << "Warning! Option --json is deprecated and will be removed soon. "
330-
<< "Use \"--format proto-json-base64\" option instead." << Endl;
331-
[[fallthrough]];
332-
case EOutputFormat::ProtoJsonBase64:
333-
return PrintProtoJsonBase64(TProtoAccessor::GetProto(value));
334-
default:
335-
throw TMisuseException() << "This command doesn't support " << format << " output format";
336-
}
337-
338-
return EXIT_SUCCESS;
339-
}
340-
341382
int TCommandDescribe::DescribeTopic(TDriver& driver) {
342383
NYdb::NTopic::TTopicClient topicClient(driver);
343384
NYdb::NTopic::TDescribeTopicSettings settings;
@@ -861,6 +902,43 @@ int TCommandDescribe::PrintTableResponsePretty(const NTable::TTableDescription&
861902
return EXIT_SUCCESS;
862903
}
863904

905+
std::pair<TString, TString> TCommandDescribe::ParseTopicConsumer() const {
906+
const size_t slashPos = Path.find_last_of('/');
907+
std::pair<TString, TString> result;
908+
if (slashPos != TString::npos && slashPos != Path.size() - 1) {
909+
result.first = Path.substr(0, slashPos);
910+
result.second = Path.substr(slashPos + 1);
911+
}
912+
return result;
913+
}
914+
915+
int TCommandDescribe::TryTopicConsumerDescribeOrFail(TDriver& driver, const NScheme::TDescribePathResult& result) {
916+
auto [topic, consumer] = ParseTopicConsumer();
917+
if (!topic || !consumer) {
918+
ThrowOnError(result); // no consumer can be found
919+
}
920+
921+
NScheme::TSchemeClient client(driver);
922+
NScheme::TDescribePathResult topicDescribeResult = client.DescribePath(
923+
topic,
924+
FillSettings(NScheme::TDescribePathSettings())
925+
).GetValueSync();
926+
if (!topicDescribeResult.IsSuccess() || topicDescribeResult.GetEntry().Type != NScheme::ESchemeEntryType::Topic && topicDescribeResult.GetEntry().Type != NScheme::ESchemeEntryType::PqGroup) {
927+
ThrowOnError(result); // return previous error, this is not topic
928+
}
929+
930+
// OK, this is topic, check the consumer
931+
NYdb::NTopic::TTopicClient topicClient(driver);
932+
auto consumerDescription = topicClient.DescribeConsumer(topic, consumer, NYdb::NTopic::TDescribeConsumerSettings().IncludeStats(ShowPartitionStats)).GetValueSync();
933+
ThrowOnError(consumerDescription);
934+
935+
return PrintDescription(this, OutputFormat, consumerDescription.GetConsumerDescription(), &TCommandDescribe::PrintConsumerResponsePretty);
936+
}
937+
938+
int TCommandDescribe::PrintConsumerResponsePretty(const NYdb::NTopic::TConsumerDescription& description) const {
939+
return PrintPrettyDescribeConsumerResult(description, ShowPartitionStats);
940+
}
941+
864942
void TCommandDescribe::WarnAboutTableOptions() {
865943
if (ShowKeyShardBoundaries || ShowStats || ShowPartitionStats || OutputFormat != EOutputFormat::Default) {
866944
TVector<TString> options;

ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@
44
#include "ydb_common.h"
55

66
#include <ydb/public/lib/ydb_cli/common/format.h>
7+
#include <ydb/public/lib/ydb_cli/common/print_utils.h>
78
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
89
#include <ydb/public/sdk/cpp/client/draft/ydb_replication.h>
910
#include <ydb/public/sdk/cpp/client/ydb_coordination/coordination.h>
11+
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
1012
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
1113
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
1214
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
1315

1416
namespace NYdb {
17+
18+
namespace NTopic {
19+
struct TDescribeConsumerResult;
20+
} // namespace NTopic
1521
namespace NConsoleClient {
1622

1723
class TCommandScheme : public TClientCommandTree {
@@ -45,6 +51,31 @@ void PrintAllPermissions(
4551
const TVector<NScheme::TPermissions>& effectivePermissions
4652
);
4753

54+
// Pretty print consumer info ('scheme describe' and 'topic consumer describe' commands)
55+
int PrintPrettyDescribeConsumerResult(const NYdb::NTopic::TConsumerDescription& description, bool withPartitionsStats);
56+
57+
template <typename TCommand, typename TValue>
58+
using TPrettyPrinter = int(TCommand::*)(const TValue&) const;
59+
60+
template <typename TCommand, typename TValue>
61+
static int PrintDescription(TCommand* self, EOutputFormat format, const TValue& value, TPrettyPrinter<TCommand, TValue> prettyFunc) {
62+
switch (format) {
63+
case EOutputFormat::Default:
64+
case EOutputFormat::Pretty:
65+
return std::invoke(prettyFunc, self, value);
66+
case EOutputFormat::Json:
67+
Cerr << "Warning! Option --json is deprecated and will be removed soon. "
68+
<< "Use \"--format proto-json-base64\" option instead." << Endl;
69+
[[fallthrough]];
70+
case EOutputFormat::ProtoJsonBase64:
71+
return PrintProtoJsonBase64(TProtoAccessor::GetProto(value));
72+
default:
73+
throw TMisuseException() << "This command doesn't support " << format << " output format";
74+
}
75+
76+
return EXIT_SUCCESS;
77+
}
78+
4879
class TCommandDescribe : public TYdbOperationCommand, public TCommandWithPath, public TCommandWithFormat {
4980
public:
5081
TCommandDescribe();
@@ -69,6 +100,10 @@ class TCommandDescribe : public TYdbOperationCommand, public TCommandWithPath, p
69100
int DescribeReplication(const TDriver& driver);
70101
int PrintReplicationResponsePretty(const NYdb::NReplication::TDescribeReplicationResult& result) const;
71102

103+
int TryTopicConsumerDescribeOrFail(NYdb::TDriver& driver, const NScheme::TDescribePathResult& result);
104+
std::pair<TString, TString> ParseTopicConsumer() const;
105+
int PrintConsumerResponsePretty(const NYdb::NTopic::TConsumerDescription& description) const;
106+
72107
template<typename TDescriptionType>
73108
void PrintPermissionsIfNeeded(const TDescriptionType& description) const {
74109
if (ShowPermissions) {

ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22

33
#include "ydb_service_topic.h"
44
#include <ydb/public/lib/ydb_cli/commands/ydb_command.h>
5+
#include <ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h>
56
#include <ydb/public/lib/ydb_cli/common/command.h>
7+
#include <ydb/public/lib/ydb_cli/common/pretty_table.h>
8+
#include <ydb/public/lib/ydb_cli/common/print_utils.h>
69
#include <ydb/public/lib/ydb_cli/topic/topic_read.h>
710
#include <ydb/public/lib/ydb_cli/topic/topic_write.h>
11+
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
812

913
#include <util/generic/set.h>
1014
#include <util/stream/str.h>
@@ -488,6 +492,7 @@ namespace {
488492
: TClientCommandTree("consumer", {}, "Consumer operations") {
489493
AddCommand(std::make_unique<TCommandTopicConsumerAdd>());
490494
AddCommand(std::make_unique<TCommandTopicConsumerDrop>());
495+
AddCommand(std::make_unique<TCommandTopicConsumerDescribe>());
491496
AddCommand(std::make_unique<TCommandTopicConsumerOffset>());
492497
}
493498

@@ -592,6 +597,41 @@ namespace {
592597
return EXIT_SUCCESS;
593598
}
594599

600+
TCommandTopicConsumerDescribe::TCommandTopicConsumerDescribe()
601+
: TYdbCommand("describe", {}, "Consumer describe operation") {
602+
}
603+
604+
void TCommandTopicConsumerDescribe::Config(TConfig& config) {
605+
TYdbCommand::Config(config);
606+
config.Opts->AddLongOption("consumer", "Consumer to describe")
607+
.Required()
608+
.StoreResult(&ConsumerName_);
609+
config.Opts->AddLongOption("partition-stats", "Show partition statistics")
610+
.StoreTrue(&ShowPartitionStats_);
611+
config.Opts->SetFreeArgsNum(1);
612+
AddFormats(config, { EOutputFormat::Pretty, EOutputFormat::ProtoJsonBase64 });
613+
SetFreeArgTitle(0, "<topic-path>", "Topic path");
614+
}
615+
616+
void TCommandTopicConsumerDescribe::Parse(TConfig& config) {
617+
TYdbCommand::Parse(config);
618+
ParseFormats();
619+
ParseTopicName(config, 0);
620+
}
621+
622+
int TCommandTopicConsumerDescribe::Run(TConfig& config) {
623+
TDriver driver = CreateDriver(config);
624+
NYdb::NTopic::TTopicClient topicClient(driver);
625+
626+
auto consumerDescription = topicClient.DescribeConsumer(TopicName, ConsumerName_, NYdb::NTopic::TDescribeConsumerSettings().IncludeStats(ShowPartitionStats_)).GetValueSync();
627+
ThrowOnError(consumerDescription);
628+
629+
return PrintDescription(this, OutputFormat, consumerDescription.GetConsumerDescription(), &TCommandTopicConsumerDescribe::PrintPrettyResult);
630+
}
631+
632+
int TCommandTopicConsumerDescribe::PrintPrettyResult(const NYdb::NTopic::TConsumerDescription& description) const {
633+
return PrintPrettyDescribeConsumerResult(description, ShowPartitionStats_);
634+
}
595635

596636
TCommandTopicConsumerCommitOffset::TCommandTopicConsumerCommitOffset()
597637
: TYdbCommand("commit", {}, "Commit offset for consumer") {

ydb/public/lib/ydb_cli/commands/ydb_service_topic.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,21 @@ namespace NYdb::NConsoleClient {
137137
TString ConsumerName_;
138138
};
139139

140+
class TCommandTopicConsumerDescribe: public TYdbCommand, public TCommandWithFormat, public TCommandWithTopicName {
141+
public:
142+
TCommandTopicConsumerDescribe();
143+
void Config(TConfig& config) override;
144+
void Parse(TConfig& config) override;
145+
int Run(TConfig& config) override;
146+
147+
private:
148+
int PrintPrettyResult(const NYdb::NTopic::TConsumerDescription& description) const;
149+
150+
private:
151+
TString ConsumerName_;
152+
bool ShowPartitionStats_ = false;
153+
};
154+
140155
class TCommandTopicConsumerCommitOffset: public TYdbCommand, public TCommandWithTopicName {
141156
public:
142157
TCommandTopicConsumerCommitOffset();

0 commit comments

Comments
 (0)