Skip to content

Commit b221825

Browse files
authored
Added support for the transfer describe to YDB CLI (#20401)
1 parent 580f810 commit b221825

File tree

18 files changed

+439
-69
lines changed

18 files changed

+439
-69
lines changed

ydb/core/grpc_services/rpc_replication.cpp

Lines changed: 91 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ namespace NKikimr::NGRpcService {
1818
using namespace Ydb;
1919

2020
using TEvDescribeReplication = TGrpcRequestOperationCall<Replication::DescribeReplicationRequest, Replication::DescribeReplicationResponse>;
21+
using TEvDescribeTransfer = TGrpcRequestOperationCall<Replication::DescribeTransferRequest, Replication::DescribeTransferResponse>;
2122

22-
class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication> {
23-
using TBase = TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication>;
23+
template <typename TReq, typename TResp, typename TResult>
24+
class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicationRPC<TReq, TResp, TResult>, TGrpcRequestOperationCall<TReq, TResp>> {
25+
using TBase = TRpcSchemeRequestActor<TDescribeReplicationRPC<TReq, TResp, TResult>, TGrpcRequestOperationCall<TReq, TResp>>;
26+
using TThis = TDescribeReplicationRPC<TReq, TResp, TResult>;
2427

2528
public:
2629
using TBase::TBase;
@@ -31,7 +34,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
3134

3235
void PassAway() override {
3336
if (ControllerPipeClient) {
34-
NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeClient);
37+
NTabletPipe::CloseAndForgetClient(TBase::SelfId(), ControllerPipeClient);
3538
}
3639

3740
TBase::PassAway();
@@ -40,12 +43,12 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
4043
private:
4144
void DescribeScheme() {
4245
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
43-
SetAuthToken(ev, *Request_);
44-
SetDatabase(ev.get(), *Request_);
45-
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());
46+
SetAuthToken(ev, *TBase::Request_);
47+
SetDatabase(ev.get(), *TBase::Request_);
48+
ev->Record.MutableDescribePath()->SetPath(TBase::GetProtoRequest()->path());
4649

47-
Send(MakeTxProxyID(), ev.release());
48-
Become(&TDescribeReplicationRPC::StateDescribeScheme);
50+
TBase::Send(MakeTxProxyID(), ev.release());
51+
TBase::Become(&TThis::StateDescribeScheme);
4952
}
5053

5154
STATEFN(StateDescribeScheme) {
@@ -62,7 +65,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
6265

6366
if (record.HasReason()) {
6467
auto issue = NYql::TIssue(record.GetReason());
65-
Request_->RaiseIssue(issue);
68+
TBase::Request_->RaiseIssue(issue);
6669
}
6770

6871
switch (record.GetStatus()) {
@@ -73,8 +76,8 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
7376
break;
7477
default: {
7578
auto issue = NYql::TIssue("Is not a replication");
76-
Request_->RaiseIssue(issue);
77-
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
79+
TBase::Request_->RaiseIssue(issue);
80+
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
7881
}
7982
}
8083

@@ -84,16 +87,16 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
8487

8588
case NKikimrScheme::StatusPathDoesNotExist:
8689
case NKikimrScheme::StatusSchemeError:
87-
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
90+
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
8891

8992
case NKikimrScheme::StatusAccessDenied:
90-
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);
93+
return TBase::Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);
9194

9295
case NKikimrScheme::StatusNotAvailable:
93-
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
96+
return TBase::Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
9497

9598
default: {
96-
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
99+
return TBase::Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
97100
}
98101
}
99102
}
@@ -104,15 +107,15 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
104107
config.RetryPolicy = {
105108
.RetryLimitCount = 3,
106109
};
107-
ControllerPipeClient = Register(NTabletPipe::CreateClient(SelfId(), tabletId, config));
110+
ControllerPipeClient = TBase::Register(NTabletPipe::CreateClient(TBase::SelfId(), tabletId, config));
108111
}
109112

110113
auto ev = std::make_unique<NReplication::TEvController::TEvDescribeReplication>();
111114
pathId.ToProto(ev->Record.MutablePathId());
112-
ev->Record.SetIncludeStats(GetProtoRequest()->include_stats());
115+
BuildRequest(TBase::GetProtoRequest(), ev->Record);
113116

114-
NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release());
115-
Become(&TDescribeReplicationRPC::StateDescribeReplication);
117+
NTabletPipe::SendData(TBase::SelfId(), ControllerPipeClient, ev.release());
118+
TBase::Become(&TThis::StateDescribeReplication);
116119
}
117120

118121
STATEFN(StateDescribeReplication) {
@@ -130,20 +133,14 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
130133
case NKikimrReplication::TEvDescribeReplicationResult::SUCCESS:
131134
break;
132135
case NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND:
133-
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
136+
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
134137
default:
135-
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
138+
return TBase::Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
136139
}
137140

138-
ConvertConnectionParams(record.GetConnectionParams(), *Result.mutable_connection_params());
139-
ConvertConsistencySettings(record.GetConsistencySettings(), Result);
140-
ConvertState(*record.MutableState(), Result);
141+
Convert(record, Result);
141142

142-
for (const auto& target : record.GetTargets()) {
143-
ConvertItem(target, *Result.add_items());
144-
}
145-
146-
return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
143+
return TBase::ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
147144
}
148145

149146
static TString BuildConnectionString(const NKikimrReplication::TConnectionParams& params) {
@@ -214,17 +211,26 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
214211
}
215212
}
216213

217-
static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
214+
static void ConvertStats(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
215+
if (from.GetStandBy().HasLagMilliSeconds()) {
216+
*to.mutable_running()->mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
217+
from.GetStandBy().GetLagMilliSeconds());
218+
}
219+
if (from.GetStandBy().HasInitialScanProgress()) {
220+
to.mutable_running()->mutable_stats()->set_initial_scan_progress(from.GetStandBy().GetInitialScanProgress());
221+
}
222+
}
223+
224+
static void ConvertStats(NKikimrReplication::TReplicationState&, Ydb::Replication::DescribeTransferResult&) {
225+
// nop
226+
}
227+
228+
template<typename T>
229+
static void ConvertState(NKikimrReplication::TReplicationState& from, T& to) {
218230
switch (from.GetStateCase()) {
219231
case NKikimrReplication::TReplicationState::kStandBy:
220232
to.mutable_running();
221-
if (from.GetStandBy().HasLagMilliSeconds()) {
222-
*to.mutable_running()->mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
223-
from.GetStandBy().GetLagMilliSeconds());
224-
}
225-
if (from.GetStandBy().HasInitialScanProgress()) {
226-
to.mutable_running()->mutable_stats()->set_initial_scan_progress(from.GetStandBy().GetInitialScanProgress());
227-
}
233+
ConvertStats(from, to);
228234
break;
229235
case NKikimrReplication::TReplicationState::kError:
230236
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
@@ -240,20 +246,65 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
240246
}
241247
}
242248

249+
static void Convert(NKikimrReplication::TEvDescribeReplicationResult& record, Replication::DescribeReplicationResult& result) {
250+
ConvertConnectionParams(record.GetConnectionParams(), *result.mutable_connection_params());
251+
ConvertConsistencySettings(record.GetConsistencySettings(), result);
252+
ConvertState(*record.MutableState(), result);
253+
254+
for (const auto& target : record.GetTargets()) {
255+
ConvertItem(target, *result.add_items());
256+
}
257+
}
258+
259+
static void Convert(NKikimrReplication::TEvDescribeReplicationResult& record, Replication::DescribeTransferResult& result) {
260+
ConvertConnectionParams(record.GetConnectionParams(), *result.mutable_connection_params());
261+
ConvertState(*record.MutableState(), result);
262+
263+
const auto& transferSpecific = record.GetTransferSpecific();
264+
result.set_source_path(transferSpecific.GetTarget().GetSrcPath());
265+
result.set_destination_path(transferSpecific.GetTarget().GetDstPath());
266+
result.set_consumer_name(transferSpecific.GetTarget().GetConsumerName());
267+
result.set_transformation_lambda(transferSpecific.GetTarget().GetTransformLambda());
268+
result.mutable_batch_settings()->set_size_bytes(transferSpecific.GetBatching().GetBatchSizeBytes());
269+
result.mutable_batch_settings()->mutable_flush_interval()->set_seconds(transferSpecific.GetBatching().GetFlushIntervalMilliSeconds() / 1000);
270+
}
271+
272+
static void BuildRequest(const Replication::DescribeReplicationRequest* from, NKikimrReplication::TEvDescribeReplication& to) {
273+
to.SetIncludeStats(from->include_stats());
274+
}
275+
276+
static void BuildRequest(const Replication::DescribeTransferRequest*, NKikimrReplication::TEvDescribeReplication&) {
277+
// nop
278+
}
279+
243280
private:
244-
Ydb::Replication::DescribeReplicationResult Result;
281+
TResult Result;
245282
TActorId ControllerPipeClient;
246283
};
247284

285+
using TDescribeReplicationActor = TDescribeReplicationRPC<Replication::DescribeReplicationRequest, Replication::DescribeReplicationResponse, Replication::DescribeReplicationResult>;
286+
using TDescribeTransferActor = TDescribeReplicationRPC<Replication::DescribeTransferRequest, Replication::DescribeTransferResponse, Replication::DescribeTransferResult>;
287+
248288
void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
249-
f.RegisterActor(new TDescribeReplicationRPC(p.release()));
289+
f.RegisterActor(new TDescribeReplicationActor(p.release()));
290+
}
291+
292+
void DoDescribeTransfer(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
293+
f.RegisterActor(new TDescribeTransferActor(p.release()));
250294
}
251295

252296
using TEvDescribeReplicationRequest = TGrpcRequestOperationCall<Ydb::Replication::DescribeReplicationRequest, Ydb::Replication::DescribeReplicationResponse>;
253297

254298
template<>
255299
IActor* TEvDescribeReplicationRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
256-
return new TDescribeReplicationRPC(msg);
300+
return new TDescribeReplicationActor(msg);
301+
}
302+
303+
using TEvDescribeTransferRequest = TGrpcRequestOperationCall<Ydb::Replication::DescribeTransferRequest, Ydb::Replication::DescribeTransferResponse>;
304+
305+
template<>
306+
IActor* TEvDescribeTransferRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
307+
return new TDescribeTransferActor(msg);
257308
}
258309

259310
}

ydb/core/grpc_services/service_replication.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ class IRequestOpCtx;
88
class IFacilityProvider;
99

1010
void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
11+
void DoDescribeTransfer(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1112

1213
}

ydb/core/protos/replication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ message TEvDescribeReplicationResult {
224224
optional TConsistencySettings ConsistencySettings = 5;
225225
repeated TReplicationConfig.TTargetSpecific.TTarget Targets = 3;
226226
optional TReplicationState State = 4;
227+
optional TReplicationConfig.TTransferSpecific TransferSpecific = 6;
227228
}
228229

229230
message TControllerIdentity {

ydb/core/transfer/ut/common/utils.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -524,10 +524,7 @@ struct MainTestCase {
524524
auto DescribeTransfer() {
525525
TReplicationClient client(Driver);
526526

527-
TDescribeReplicationSettings settings;
528-
settings.IncludeStats(true);
529-
530-
return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync();
527+
return client.DescribeTransfer(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName).ExtractValueSync();
531528
}
532529

533530
auto DescribeConsumer(const std::string& consumerName) {
@@ -690,19 +687,19 @@ struct MainTestCase {
690687
Sleep(TDuration::Seconds(1));
691688
}
692689

693-
CheckTransferState(TReplicationDescription::EState::Running);
690+
CheckTransferState(TTransferDescription::EState::Running);
694691
UNIT_ASSERT_C(false, "Unable to wait transfer result");
695692
}
696693

697-
TReplicationDescription CheckTransferState(TReplicationDescription::EState expected) {
694+
TTransferDescription CheckTransferState(TTransferDescription::EState expected) {
698695
for (size_t i = 20; i--;) {
699-
auto result = DescribeTransfer().GetReplicationDescription();
696+
auto result = DescribeTransfer().GetTransferDescription();
700697
if (expected == result.GetState()) {
701698
return result;
702699
}
703700

704701
std::string issues;
705-
if (result.GetState() == TReplicationDescription::EState::Error) {
702+
if (result.GetState() == TTransferDescription::EState::Error) {
706703
issues = result.GetErrorState().GetIssues().ToOneLineString();
707704
}
708705

@@ -714,7 +711,7 @@ struct MainTestCase {
714711
}
715712

716713
void CheckTransferStateError(const std::string& expectedMessage) {
717-
auto result = CheckTransferState(TReplicationDescription::EState::Error);
714+
auto result = CheckTransferState(TTransferDescription::EState::Error);
718715
Cerr << ">>>>> ACTUAL: " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
719716
Cerr << ">>>>> EXPECTED: " << expectedMessage << Endl << Flush;
720717
UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains(expectedMessage));

ydb/core/transfer/ut/functional/transfer_ut.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ Y_UNIT_TEST_SUITE(Transfer)
845845
_C("Message", TString("Message-1"))
846846
}});
847847

848-
testCase.CheckTransferState(TReplicationDescription::EState::Running);
848+
testCase.CheckTransferState(TTransferDescription::EState::Running);
849849

850850
testCase.DropTopic();
851851

@@ -984,14 +984,14 @@ Y_UNIT_TEST_SUITE(Transfer)
984984
_C("Message", TString("Message-1"))
985985
}});
986986

987-
testCase.CheckTransferState(TReplicationDescription::EState::Running);
987+
testCase.CheckTransferState(TTransferDescription::EState::Running);
988988

989989
Cerr << "State: Paused" << Endl << Flush;
990990

991991
testCase.PauseTransfer();
992992

993993
Sleep(TDuration::Seconds(1));
994-
testCase.CheckTransferState(TReplicationDescription::EState::Paused);
994+
testCase.CheckTransferState(TTransferDescription::EState::Paused);
995995

996996
testCase.Write({"Message-2"});
997997

@@ -1006,7 +1006,7 @@ Y_UNIT_TEST_SUITE(Transfer)
10061006
testCase.ResumeTransfer();
10071007

10081008
// Transfer is resumed. New messages are added to the table.
1009-
testCase.CheckTransferState(TReplicationDescription::EState::Running);
1009+
testCase.CheckTransferState(TTransferDescription::EState::Running);
10101010
testCase.CheckResult({{
10111011
_C("Message", TString("Message-1"))
10121012
}, {
@@ -1015,10 +1015,10 @@ Y_UNIT_TEST_SUITE(Transfer)
10151015

10161016
// More cycles for pause/resume
10171017
testCase.PauseTransfer();
1018-
testCase.CheckTransferState(TReplicationDescription::EState::Paused);
1018+
testCase.CheckTransferState(TTransferDescription::EState::Paused);
10191019

10201020
testCase.ResumeTransfer();
1021-
testCase.CheckTransferState(TReplicationDescription::EState::Running);
1021+
testCase.CheckTransferState(TTransferDescription::EState::Running);
10221022

10231023
testCase.DropTransfer();
10241024
testCase.DropTable();

ydb/core/tx/replication/controller/tx_describe_replication.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,25 @@ class TController::TTxDescribeReplication: public TTxBase {
197197
totalScanProgress = std::make_optional<TInitialScanProgress>();
198198
}
199199

200+
bool isTransfer = replication->GetConfig().HasTransferSpecific();
200201
for (ui64 tid = 0; tid < replication->GetNextTargetId(); ++tid) {
201202
auto* target = replication->FindTarget(tid);
202203
if (!target) {
203204
continue;
204205
}
205206

207+
if (isTransfer) {
208+
// transfer always has one target
209+
auto& specific = replication->GetConfig().GetTransferSpecific();
210+
211+
auto& transferSpecific = *Result->Record.MutableTransferSpecific();
212+
transferSpecific.MutableTarget()->SetSrcPath(target->GetSrcPath());
213+
transferSpecific.MutableTarget()->SetDstPath(target->GetDstPath());
214+
transferSpecific.MutableTarget()->SetConsumerName(target->GetStreamConsumerName() ? target->GetStreamConsumerName() : specific.GetTarget().GetConsumerName());
215+
transferSpecific.MutableTarget()->SetTransformLambda(specific.GetTarget().GetTransformLambda());
216+
transferSpecific.MutableBatching()->CopyFrom(specific.GetBatching());
217+
}
218+
206219
auto& item = *Result->Record.AddTargets();
207220
item.SetId(target->GetId());
208221
item.SetSrcPath(target->GetSrcPath());

ydb/public/api/grpc/draft/ydb_replication_v1.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ option java_package = "com.yandex.ydb.replication.v1";
77

88
service ReplicationService {
99
rpc DescribeReplication(Replication.DescribeReplicationRequest) returns (Replication.DescribeReplicationResponse);
10+
rpc DescribeTransfer(Replication.DescribeTransferRequest) returns (Replication.DescribeTransferResponse);
1011
}

0 commit comments

Comments
 (0)