Skip to content

Commit 7c79b50

Browse files
CyberROFLGazizonoki
authored andcommitted
Moved commit "Replication consistency levels support in C++ SDK" from ydb repo
1 parent a1a226d commit 7c79b50

File tree

3 files changed

+67
-0
lines changed

3 files changed

+67
-0
lines changed

include/ydb-cpp-sdk/client/draft/ydb_replication.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
namespace Ydb::Replication {
1111
class ConnectionParams;
12+
class ConsistencyLevelGlobal;
1213
class DescribeReplicationResult;
1314
class DescribeReplicationResult_Stats;
1415
}
@@ -60,6 +61,19 @@ class TConnectionParams: private TCommonClientSettings {
6061
> Credentials_;
6162
};
6263

64+
struct TRowConsistency {
65+
};
66+
67+
class TGlobalConsistency {
68+
public:
69+
explicit TGlobalConsistency(const Ydb::Replication::ConsistencyLevelGlobal& proto);
70+
71+
const TDuration& GetCommitInterval() const;
72+
73+
private:
74+
TDuration CommitInterval_;
75+
};
76+
6377
class TStats {
6478
public:
6579
TStats() = default;
@@ -108,6 +122,11 @@ class TReplicationDescription {
108122
std::optional<std::string> SrcChangefeedName;
109123
};
110124

125+
enum class EConsistencyLevel {
126+
Row,
127+
Global,
128+
};
129+
111130
enum class EState {
112131
Running,
113132
Error,
@@ -119,6 +138,9 @@ class TReplicationDescription {
119138
const TConnectionParams& GetConnectionParams() const;
120139
const std::vector<TItem> GetItems() const;
121140

141+
EConsistencyLevel GetConsistencyLevel() const;
142+
const TGlobalConsistency& GetGlobalConsistency() const;
143+
122144
EState GetState() const;
123145
const TRunningState& GetRunningState() const;
124146
const TErrorState& GetErrorState() const;
@@ -127,6 +149,12 @@ class TReplicationDescription {
127149
private:
128150
TConnectionParams ConnectionParams_;
129151
std::vector<TItem> Items_;
152+
153+
std::variant<
154+
TRowConsistency,
155+
TGlobalConsistency
156+
> ConsistencyLevel_;
157+
130158
std::variant<
131159
TRunningState,
132160
TErrorState,

src/api/protos/draft/ydb_replication.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ message ConnectionParams {
4545
}
4646
}
4747

48+
message ConsistencyLevelRow {
49+
}
50+
51+
message ConsistencyLevelGlobal {
52+
google.protobuf.Duration commit_interval = 1;
53+
}
54+
4855
message DescribeReplicationResult {
4956
message Stats {
5057
optional google.protobuf.Duration lag = 1;
@@ -72,7 +79,12 @@ message DescribeReplicationResult {
7279

7380
// Description of scheme object.
7481
Ydb.Scheme.Entry self = 1;
82+
7583
ConnectionParams connection_params = 2;
84+
oneof consistency_level {
85+
ConsistencyLevelRow row_consistency = 7;
86+
ConsistencyLevelGlobal global_consistency = 8;
87+
}
7688
repeated Item items = 3;
7789
oneof state {
7890
RunningState running = 4;

src/client/draft/ydb_replication.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ static TDuration DurationToDuration(const google::protobuf::Duration& value) {
6868
return TDuration::MilliSeconds(google::protobuf::util::TimeUtil::DurationToMilliseconds(value));
6969
}
7070

71+
TGlobalConsistency::TGlobalConsistency(const Ydb::Replication::ConsistencyLevelGlobal& proto)
72+
: CommitInterval_(DurationToDuration(proto.commit_interval()))
73+
{
74+
}
75+
76+
const TDuration& TGlobalConsistency::GetCommitInterval() const {
77+
return CommitInterval_;
78+
}
79+
7180
TStats::TStats(const Ydb::Replication::DescribeReplicationResult_Stats& stats)
7281
: Lag_(stats.has_lag() ? std::make_optional(DurationToDuration(stats.lag())) : std::nullopt)
7382
, InitialScanProgress_(stats.has_initial_scan_progress() ? std::make_optional(stats.initial_scan_progress()) : std::nullopt)
@@ -131,6 +140,15 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ
131140
});
132141
}
133142

143+
switch (desc.consistency_level_case()) {
144+
case Ydb::Replication::DescribeReplicationResult::kGlobalConsistency:
145+
ConsistencyLevel_ = TGlobalConsistency(desc.global_consistency());
146+
break;
147+
148+
default:
149+
break;
150+
}
151+
134152
switch (desc.state_case()) {
135153
case Ydb::Replication::DescribeReplicationResult::kRunning:
136154
State_ = TRunningState(desc.running().stats());
@@ -157,6 +175,15 @@ const std::vector<TReplicationDescription::TItem> TReplicationDescription::GetIt
157175
return Items_;
158176
}
159177

178+
TReplicationDescription::EConsistencyLevel TReplicationDescription::GetConsistencyLevel() const {
179+
return static_cast<EConsistencyLevel>(ConsistencyLevel_.index());
180+
}
181+
182+
const TGlobalConsistency& TReplicationDescription::GetGlobalConsistency() const {
183+
return std::get<TGlobalConsistency>(ConsistencyLevel_);
184+
}
185+
186+
160187
TReplicationDescription::EState TReplicationDescription::GetState() const {
161188
return static_cast<EState>(State_.index());
162189
}

0 commit comments

Comments
 (0)