Skip to content

Commit c65fe0f

Browse files
committed
Moved commit "Replication stats (total & per item): lag, initial scan progress" from ydb repo
1 parent 6984ae6 commit c65fe0f

File tree

4 files changed

+61
-13
lines changed

4 files changed

+61
-13
lines changed

include/ydb-cpp-sdk/client/draft/ydb_replication.h

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
namespace Ydb::Replication {
1111
class ConnectionParams;
1212
class DescribeReplicationResult;
13+
class DescribeReplicationResult_Stats;
1314
}
1415

1516
namespace NYdb {
@@ -24,7 +25,11 @@ namespace NYdb::NReplication {
2425

2526
class TDescribeReplicationResult;
2627
using TAsyncDescribeReplicationResult = NThreading::TFuture<TDescribeReplicationResult>;
27-
struct TDescribeReplicationSettings: public TOperationRequestSettings<TDescribeReplicationSettings> {};
28+
29+
struct TDescribeReplicationSettings: public TOperationRequestSettings<TDescribeReplicationSettings> {
30+
using TSelf = TDescribeReplicationSettings;
31+
FLUENT_SETTING_DEFAULT(bool, IncludeStats, false);
32+
};
2833

2934
struct TStaticCredentials {
3035
std::string User;
@@ -58,15 +63,28 @@ class TConnectionParams: private TCommonClientSettings {
5863
> Credentials_;
5964
};
6065

61-
struct TRunningState {
66+
class TStats {
6267
public:
63-
TRunningState() = default;
64-
explicit TRunningState(const std::optional<TDuration>& lag);
68+
TStats() = default;
69+
TStats(const Ydb::Replication::DescribeReplicationResult_Stats& stats);
6570

6671
const std::optional<TDuration>& GetLag() const;
72+
const std::optional<float>& GetInitialScanProgress() const;
6773

6874
private:
6975
std::optional<TDuration> Lag_;
76+
std::optional<float> InitialScanProgress_;
77+
};
78+
79+
class TRunningState {
80+
public:
81+
TRunningState() = default;
82+
explicit TRunningState(const TStats& stats);
83+
84+
const TStats& GetStats() const;
85+
86+
private:
87+
TStats Stats_;
7088
};
7189

7290
struct TDoneState {};
@@ -89,6 +107,7 @@ class TReplicationDescription {
89107
uint64_t Id;
90108
std::string SrcPath;
91109
std::string DstPath;
110+
TStats Stats;
92111
std::optional<std::string> SrcChangefeedName;
93112
};
94113

include/ydb-cpp-sdk/client/table/table.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,11 @@ class TChangefeedDescription {
271271
public:
272272
class TInitialScanProgress {
273273
public:
274+
TInitialScanProgress();
274275
explicit TInitialScanProgress(uint32_t total, uint32_t completed);
275276

277+
TInitialScanProgress& operator+=(const TInitialScanProgress& other);
278+
276279
uint32_t GetPartsTotal() const;
277280
uint32_t GetPartsCompleted() const;
278281
float GetProgress() const; // percentage

src/client/draft/ydb_replication.cpp

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,33 @@ const TOAuthCredentials& TConnectionParams::GetOAuthCredentials() const {
5959
return std::get<TOAuthCredentials>(Credentials_);
6060
}
6161

62-
TRunningState::TRunningState(const std::optional<TDuration>& lag)
63-
: Lag_(lag)
62+
static TDuration DurationToDuration(const google::protobuf::Duration& value) {
63+
return TDuration::MilliSeconds(google::protobuf::util::TimeUtil::DurationToMilliseconds(value));
64+
}
65+
66+
TStats::TStats(const Ydb::Replication::DescribeReplicationResult_Stats& stats)
67+
: Lag_(stats.has_lag() ? std::make_optional(DurationToDuration(stats.lag())) : std::nullopt)
68+
, InitialScanProgress_(stats.has_initial_scan_progress() ? std::make_optional(stats.initial_scan_progress()) : std::nullopt)
6469
{
6570
}
6671

67-
const std::optional<TDuration>& TRunningState::GetLag() const {
72+
const std::optional<TDuration>& TStats::GetLag() const {
6873
return Lag_;
6974
}
7075

76+
const std::optional<float>& TStats::GetInitialScanProgress() const {
77+
return InitialScanProgress_;
78+
}
79+
80+
TRunningState::TRunningState(const TStats& stats)
81+
: Stats_(stats)
82+
{
83+
}
84+
85+
const TStats& TRunningState::GetStats() const {
86+
return Stats_;
87+
}
88+
7189
class TErrorState::TImpl {
7290
public:
7391
NYql::TIssues Issues;
@@ -87,10 +105,6 @@ const NYql::TIssues& TErrorState::GetIssues() const {
87105
return Impl_->Issues;
88106
}
89107

90-
TDuration DurationToDuration(const google::protobuf::Duration& value) {
91-
return TDuration::MilliSeconds(google::protobuf::util::TimeUtil::DurationToMilliseconds(value));
92-
}
93-
94108
template <typename T>
95109
NYql::TIssues IssuesFromMessage(const ::google::protobuf::RepeatedPtrField<T>& message) {
96110
NYql::TIssues issues;
@@ -107,15 +121,15 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ
107121
.Id = item.id(),
108122
.SrcPath = item.source_path(),
109123
.DstPath = item.destination_path(),
124+
.Stats = TStats(item.stats()),
110125
.SrcChangefeedName = item.has_source_changefeed_name()
111126
? std::make_optional(item.source_changefeed_name()) : std::nullopt,
112127
});
113128
}
114129

115130
switch (desc.state_case()) {
116131
case Ydb::Replication::DescribeReplicationResult::kRunning:
117-
State_ = TRunningState(desc.running().has_lag()
118-
? std::make_optional(DurationToDuration(desc.running().lag())) : std::nullopt);
132+
State_ = TRunningState(desc.running().stats());
119133
break;
120134

121135
case Ydb::Replication::DescribeReplicationResult::kError:
@@ -183,6 +197,7 @@ class TReplicationClient::TImpl: public TClientImplCommon<TReplicationClient::TI
183197

184198
auto request = MakeOperationRequest<DescribeReplicationRequest>(settings);
185199
request.set_path(TStringType{path});
200+
request.set_include_stats(settings.IncludeStats_);
186201

187202
auto promise = NThreading::NewPromise<TDescribeReplicationResult>();
188203

src/client/table/table.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2398,11 +2398,22 @@ TChangefeedDescription::TChangefeedDescription(const Ydb::Table::ChangefeedDescr
23982398
: TChangefeedDescription(FromProto(proto))
23992399
{}
24002400

2401+
TChangefeedDescription::TInitialScanProgress::TInitialScanProgress()
2402+
: PartsTotal(0)
2403+
, PartsCompleted(0)
2404+
{}
2405+
24012406
TChangefeedDescription::TInitialScanProgress::TInitialScanProgress(uint32_t total, uint32_t completed)
24022407
: PartsTotal(total)
24032408
, PartsCompleted(completed)
24042409
{}
24052410

2411+
TChangefeedDescription::TInitialScanProgress& TChangefeedDescription::TInitialScanProgress::operator+=(const TInitialScanProgress& other) {
2412+
PartsTotal += other.PartsTotal;
2413+
PartsCompleted += other.PartsCompleted;
2414+
return *this;
2415+
}
2416+
24062417
uint32_t TChangefeedDescription::TInitialScanProgress::GetPartsTotal() const {
24072418
return PartsTotal;
24082419
}

0 commit comments

Comments
 (0)