Skip to content

Commit 37b6f07

Browse files
authored
Timestamp topic options: parse in different formats, describe which timestamp is in option (#11387)
1 parent f4a2a01 commit 37b6f07

File tree

2 files changed

+54
-29
lines changed

2 files changed

+54
-29
lines changed

ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@
1212

1313
#include <util/generic/set.h>
1414
#include <util/stream/str.h>
15+
#include <util/string/cast.h>
1516
#include <util/string/hex.h>
1617
#include <util/string/vector.h>
1718
#include <util/string/join.h>
1819

20+
#define TIMESTAMP_FORMAT_OPTION_DESCRIPTION "Timestamp may be specified in unix time format (seconds from 1970.01.01) or in ISO-8601 format (like 2020-07-10T15:00:00Z)"
21+
1922
namespace NYdb::NConsoleClient {
2023
namespace {
2124
THashMap<NYdb::NTopic::ECodec, TString> CodecsDescriptions = {
@@ -98,36 +101,55 @@ namespace NYdb::NConsoleClient {
98101
}
99102
} // namespace
100103

104+
std::function<void(const TString& opt)> TimestampOptionHandler(TMaybe<TInstant>* destination) {
105+
return [destination](const TString& opt) {
106+
ui64 seconds = 0;
107+
if (TryFromString(opt, seconds)) { // unix time
108+
destination->ConstructInPlace(TInstant::Seconds(seconds));
109+
return;
110+
}
101111

102-
TString PrepareAllowedCodecsDescription(const TString& descriptionPrefix, const TVector<NTopic::ECodec>& codecs) {
103-
TStringStream description;
104-
description << descriptionPrefix << ". Available codecs: ";
105-
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
106-
for (const auto& codec : codecs) {
107-
auto findResult = CodecsDescriptions.find(codec);
108-
Y_ABORT_UNLESS(findResult != CodecsDescriptions.end(),
109-
"Couldn't find description for %s codec", (TStringBuilder() << codec).c_str());
110-
description << "\n " << colors.BoldColor() << codec << colors.OldColor()
111-
<< "\n " << findResult->second;
112+
TInstant time;
113+
if (TInstant::TryParseIso8601(opt, time)) {
114+
destination->ConstructInPlace(time);
115+
return;
112116
}
113117

114-
return description.Str();
118+
TStringBuilder err;
119+
err << "failed to parse \"" << opt << "\" as a timestamp. It must be either unix time format or ISO-8601 format";
120+
throw std::runtime_error(err);
121+
};
122+
}
123+
124+
TString PrepareAllowedCodecsDescription(const TString& descriptionPrefix, const TVector<NTopic::ECodec>& codecs) {
125+
TStringStream description;
126+
description << descriptionPrefix << ". Available codecs: ";
127+
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
128+
for (const auto& codec : codecs) {
129+
auto findResult = CodecsDescriptions.find(codec);
130+
Y_ABORT_UNLESS(findResult != CodecsDescriptions.end(),
131+
"Couldn't find description for %s codec", (TStringBuilder() << codec).c_str());
132+
description << "\n " << colors.BoldColor() << codec << colors.OldColor()
133+
<< "\n " << findResult->second;
115134
}
116135

117-
namespace {
118-
NTopic::ECodec ParseCodec(const TString& codecStr, const TVector<NTopic::ECodec>& allowedCodecs) {
119-
auto exists = ExistingCodecs.find(to_lower(codecStr));
120-
if (exists == ExistingCodecs.end()) {
121-
throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
122-
}
136+
return description.Str();
137+
}
123138

124-
if (std::find(allowedCodecs.begin(), allowedCodecs.end(), exists->second) == allowedCodecs.end()) {
125-
throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
126-
}
139+
namespace {
140+
NTopic::ECodec ParseCodec(const TString& codecStr, const TVector<NTopic::ECodec>& allowedCodecs) {
141+
auto exists = ExistingCodecs.find(to_lower(codecStr));
142+
if (exists == ExistingCodecs.end()) {
143+
throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
144+
}
127145

128-
return exists->second;
146+
if (std::find(allowedCodecs.begin(), allowedCodecs.end(), exists->second) == allowedCodecs.end()) {
147+
throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
129148
}
149+
150+
return exists->second;
130151
}
152+
}
131153

132154
void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) {
133155
TString description = PrepareAllowedCodecsDescription("Comma-separated list of supported codecs", supportedCodecs);
@@ -524,9 +546,10 @@ namespace {
524546
config.Opts->AddLongOption("consumer", "New consumer for topic")
525547
.Required()
526548
.StoreResult(&ConsumerName_);
527-
config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed")
549+
config.Opts->AddLongOption("starting-message-timestamp", "'Written_at' timestamp from which read is allowed. " TIMESTAMP_FORMAT_OPTION_DESCRIPTION)
550+
.RequiredArgument("TIMESTAMP")
528551
.Optional()
529-
.StoreResult(&StartingMessageTimestamp_);
552+
.Handler1T<TString>(TimestampOptionHandler(&StartingMessageTimestamp_));
530553
config.Opts->AddLongOption("important", "Is consumer important")
531554
.Optional()
532555
.DefaultValue(false)
@@ -553,7 +576,7 @@ namespace {
553576
NYdb::NTopic::TConsumerSettings<NYdb::NTopic::TAlterTopicSettings> consumerSettings(readRuleSettings);
554577
consumerSettings.ConsumerName(ConsumerName_);
555578
if (StartingMessageTimestamp_.Defined()) {
556-
consumerSettings.ReadFrom(TInstant::Seconds(*StartingMessageTimestamp_));
579+
consumerSettings.ReadFrom(*StartingMessageTimestamp_);
557580
}
558581

559582
auto codecs = GetCodecs();
@@ -780,9 +803,10 @@ namespace {
780803
.Optional()
781804
.NoArgument()
782805
.StoreValue(&Wait_, true);
783-
config.Opts->AddLongOption("timestamp", "Timestamp from which messages will be read. If not specified, messages are read from the last commit point for the chosen consumer.")
806+
config.Opts->AddLongOption("timestamp", "'Written_at' timestamp from which messages will be read. If not specified, messages are read from the last commit point for the chosen consumer. " TIMESTAMP_FORMAT_OPTION_DESCRIPTION)
807+
.RequiredArgument("TIMESTAMP")
784808
.Optional()
785-
.StoreResult(&Timestamp_);
809+
.Handler1T<TString>(TimestampOptionHandler(&Timestamp_));
786810
config.Opts->AddLongOption("partition-ids", "Comma separated list of partition ids to read from. If not specified, messages are read from all partitions.")
787811
.Optional()
788812
.SplitHandler(&PartitionIds_, ',');
@@ -841,7 +865,7 @@ namespace {
841865
settings.ConsumerName(Consumer_);
842866
// settings.ReadAll(); // TODO(shmel1k@): change to read only original?
843867
if (Timestamp_.Defined()) {
844-
settings.ReadFromTimestamp(TInstant::Seconds(*(Timestamp_.Get())));
868+
settings.ReadFromTimestamp(*Timestamp_);
845869
}
846870

847871
// TODO(shmel1k@): partition can be added here.

ydb/public/lib/ydb_cli/commands/ydb_service_topic.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ namespace NYdb::NConsoleClient {
1313
TString PrepareAllowedCodecsDescription(const TString& descriptionPrefix, const TVector<NTopic::ECodec>& codecs);
1414
TVector<NTopic::ECodec> InitAllowedCodecs();
1515
const TVector<NTopic::ECodec> AllowedCodecs = InitAllowedCodecs();
16+
std::function<void(const TString& opt)> TimestampOptionHandler(TMaybe<TInstant>* destination); // parses timestamp in the following formats: unix time, ISO-8601
1617

1718
class TCommandWithSupportedCodecs {
1819
protected:
@@ -123,7 +124,7 @@ namespace NYdb::NConsoleClient {
123124
private:
124125
TString ConsumerName_;
125126
bool IsImportant_;
126-
TMaybe<ui64> StartingMessageTimestamp_;
127+
TMaybe<TInstant> StartingMessageTimestamp_;
127128
};
128129

129130
class TCommandTopicConsumerDrop: public TYdbCommand, public TCommandWithTopicName {
@@ -193,7 +194,7 @@ namespace NYdb::NConsoleClient {
193194
TVector<ui64> PartitionIds_;
194195
TMaybe<uint32_t> Offset_;
195196
TMaybe<uint32_t> Partition_;
196-
TMaybe<ui64> Timestamp_;
197+
TMaybe<TInstant> Timestamp_;
197198
TMaybe<TString> File_;
198199
TMaybe<TString> TransformStr_;
199200

0 commit comments

Comments
 (0)