Skip to content

Commit 8e9470d

Browse files
CyberROFLgithub-actions[bot]
authored andcommitted
New CHANGEFEED's option: SCHEMA_CHANGES (#19303)
1 parent 76b415e commit 8e9470d

File tree

4 files changed

+25
-2
lines changed

4 files changed

+25
-2
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
a19b867b82af456bce938a0506364f0ba85214a5
1+
8bbfc1965ae5de65c96c36238b45ec33dd84f0a0

include/ydb-cpp-sdk/client/table/table.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,8 @@ class TChangefeedDescription {
397397

398398
// Enable virtual timestamps
399399
TChangefeedDescription& WithVirtualTimestamps();
400+
// Enable schema changes
401+
TChangefeedDescription& WithSchemaChanges();
400402
// Enable resolved timestamps
401403
TChangefeedDescription& WithResolvedTimestamps(const TDuration& interval);
402404
// Customise retention period of underlying topic (24h by default).
@@ -415,6 +417,7 @@ class TChangefeedDescription {
415417
EChangefeedFormat GetFormat() const;
416418
EChangefeedState GetState() const;
417419
bool GetVirtualTimestamps() const;
420+
bool GetSchemaChanges() const;
418421
const std::optional<TDuration>& GetResolvedTimestamps() const;
419422
bool GetInitialScan() const;
420423
const std::unordered_map<std::string, std::string>& GetAttributes() const;
@@ -442,6 +445,7 @@ class TChangefeedDescription {
442445
EChangefeedFormat Format_;
443446
EChangefeedState State_ = EChangefeedState::Unknown;
444447
bool VirtualTimestamps_ = false;
448+
bool SchemaChanges_ = false;
445449
std::optional<TDuration> ResolvedTimestamps_;
446450
std::optional<TDuration> RetentionPeriod_;
447451
bool InitialScan_ = false;

src/api/protos/ydb_table.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ message Changefeed {
233233
google.protobuf.Duration resolved_timestamps_interval = 9;
234234
// Partitioning settings of underlying topic.
235235
Topic.PartitioningSettings topic_partitioning_settings = 10;
236+
// Emit schema change events or not
237+
bool schema_changes = 11;
236238
}
237239

238240
message ChangefeedDescription {
@@ -274,6 +276,8 @@ message ChangefeedDescription {
274276
google.protobuf.Duration resolved_timestamps_interval = 8;
275277
// Progress of initial scan. If unspecified, initial scan was not launched.
276278
InitialScanProgress initial_scan_progress = 9;
279+
// State of emitting of schema change events
280+
bool schema_changes = 10;
277281
}
278282

279283
message StoragePool {

src/client/table/table.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2687,6 +2687,11 @@ TChangefeedDescription& TChangefeedDescription::WithVirtualTimestamps() {
26872687
return *this;
26882688
}
26892689

2690+
TChangefeedDescription& TChangefeedDescription::WithSchemaChanges() {
2691+
SchemaChanges_ = true;
2692+
return *this;
2693+
}
2694+
26902695
TChangefeedDescription& TChangefeedDescription::WithResolvedTimestamps(const TDuration& value) {
26912696
ResolvedTimestamps_ = value;
26922697
return *this;
@@ -2742,6 +2747,10 @@ bool TChangefeedDescription::GetVirtualTimestamps() const {
27422747
return VirtualTimestamps_;
27432748
}
27442749

2750+
bool TChangefeedDescription::GetSchemaChanges() const {
2751+
return SchemaChanges_;
2752+
}
2753+
27452754
const std::optional<TDuration>& TChangefeedDescription::GetResolvedTimestamps() const {
27462755
return ResolvedTimestamps_;
27472756
}
@@ -2806,6 +2815,9 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
28062815
if (proto.virtual_timestamps()) {
28072816
ret.WithVirtualTimestamps();
28082817
}
2818+
if (proto.schema_changes()) {
2819+
ret.WithSchemaChanges();
2820+
}
28092821
if (proto.has_resolved_timestamps_interval()) {
28102822
ret.WithResolvedTimestamps(TDuration::MilliSeconds(
28112823
::google::protobuf::util::TimeUtil::DurationToMilliseconds(proto.resolved_timestamps_interval())));
@@ -2849,6 +2861,7 @@ template <typename TProto>
28492861
void TChangefeedDescription::SerializeCommonFields(TProto& proto) const {
28502862
proto.set_name(TStringType{Name_});
28512863
proto.set_virtual_timestamps(VirtualTimestamps_);
2864+
proto.set_schema_changes(SchemaChanges_);
28522865
proto.set_aws_region(TStringType{AwsRegion_});
28532866

28542867
switch (Mode_) {
@@ -2932,7 +2945,8 @@ void TChangefeedDescription::Out(IOutputStream& o) const {
29322945
o << "{ name: \"" << Name_ << "\""
29332946
<< ", mode: " << Mode_ << ""
29342947
<< ", format: " << Format_ << ""
2935-
<< ", virtual_timestamps: " << (VirtualTimestamps_ ? "on": "off") << "";
2948+
<< ", virtual_timestamps: " << (VirtualTimestamps_ ? "on": "off") << ""
2949+
<< ", schema_changes: " << (SchemaChanges_ ? "on": "off") << "";
29362950

29372951
if (ResolvedTimestamps_) {
29382952
o << ", resolved_timestamps: " << *ResolvedTimestamps_;
@@ -2958,6 +2972,7 @@ bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription&
29582972
&& lhs.GetMode() == rhs.GetMode()
29592973
&& lhs.GetFormat() == rhs.GetFormat()
29602974
&& lhs.GetVirtualTimestamps() == rhs.GetVirtualTimestamps()
2975+
&& lhs.GetSchemaChanges() == rhs.GetSchemaChanges()
29612976
&& lhs.GetResolvedTimestamps() == rhs.GetResolvedTimestamps()
29622977
&& lhs.GetAwsRegion() == rhs.GetAwsRegion();
29632978
}

0 commit comments

Comments
 (0)