Skip to content

Commit f1e5868

Browse files
committed
Added validation of the existence of the target transfer table (#17677)
1 parent 8c3f4c1 commit f1e5868

File tree

5 files changed

+140
-31
lines changed

5 files changed

+140
-31
lines changed

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8114,6 +8114,22 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
81148114
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "consumer must be not empty");
81158115
}
81168116

8117+
{
8118+
auto query = Sprintf(R"(
8119+
--!syntax_v1
8120+
CREATE TRANSFER `/Root/transfer`
8121+
FROM `/Root/topic` TO `/Root/table_not_exists` USING ($x) -> { RETURN <| id:$x._offset |>; }
8122+
WITH (
8123+
ENDPOINT = "%s",
8124+
DATABASE = "/Root"
8125+
);
8126+
)", kikimr.GetEndpoint().c_str());
8127+
8128+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
8129+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
8130+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "The transfer destination path '/Root/table_not_exists' not found");
8131+
}
8132+
81178133
// positive
81188134
{
81198135
auto query = R"(
@@ -8372,6 +8388,22 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
83728388
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "consumer must be not empty");
83738389
}
83748390

8391+
{
8392+
auto query = Sprintf(R"(
8393+
--!syntax_v1
8394+
CREATE TRANSFER `/Root/transfer`
8395+
FROM `/Root/topic` TO `/Root/table_not_exists` USING ($x) -> { RETURN <| id:$x._offset |>; }
8396+
WITH (
8397+
ENDPOINT = "%s",
8398+
DATABASE = "/Root"
8399+
);
8400+
)", kikimr.GetEndpoint().c_str());
8401+
8402+
const auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
8403+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
8404+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "The transfer destination path '/Root/table_not_exists' not found");
8405+
}
8406+
83758407
// positive
83768408
{
83778409
auto query = R"(

ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ namespace {
1717

1818
struct IStrategy {
1919
virtual TPathElement::EPathType GetPathType() const = 0;
20-
virtual bool Validate(TProposeResponse& result, const NKikimrSchemeOp::TReplicationDescription& desc) const = 0;
20+
virtual bool Validate(TProposeResponse& result, const NKikimrSchemeOp::TReplicationDescription& desc, const TOperationContext& context) const = 0;
2121
};
2222

2323
struct TReplicationStrategy : public IStrategy {
2424
TPathElement::EPathType GetPathType() const override {
2525
return TPathElement::EPathType::EPathTypeReplication;
2626
};
2727

28-
bool Validate(TProposeResponse& result, const NKikimrSchemeOp::TReplicationDescription& desc) const override {
28+
bool Validate(TProposeResponse& result, const NKikimrSchemeOp::TReplicationDescription& desc, const TOperationContext&) const override {
2929
if (desc.GetConfig().HasTransferSpecific()) {
3030
result.SetError(NKikimrScheme::StatusInvalidParameter, "Wrong replication configuration");
3131
return true;
@@ -44,7 +44,7 @@ struct TTransferStrategy : public IStrategy {
4444
return TPathElement::EPathType::EPathTypeTransfer;
4545
};
4646

47-
bool Validate(TProposeResponse& result, const NKikimrSchemeOp::TReplicationDescription& desc) const override {
47+
bool Validate(TProposeResponse& result, const NKikimrSchemeOp::TReplicationDescription& desc, const TOperationContext& context) const override {
4848
if (!AppData()->FeatureFlags.GetEnableTopicTransfer()) {
4949
result.SetError(NKikimrScheme::StatusInvalidParameter, "Topic transfer creation is disabled");
5050
return true;
@@ -72,6 +72,17 @@ struct TTransferStrategy : public IStrategy {
7272
return true;
7373
}
7474

75+
const auto& target = desc.GetConfig().GetTransferSpecific().GetTarget();
76+
auto targetPath = TPath::Resolve(target.GetDstPath(), context.SS);
77+
if (!targetPath.IsResolved() || targetPath.IsUnderDeleting() || targetPath->IsUnderMoving() || targetPath.IsDeleted()) {
78+
result.SetError(NKikimrScheme::StatusNotAvailable, TStringBuilder() << "The transfer destination path '" << target.GetDstPath() << "' not found");
79+
return true;
80+
}
81+
if (!targetPath->IsColumnTable() && !targetPath->IsTable()) {
82+
result.SetError(NKikimrScheme::StatusNotAvailable, TStringBuilder() << "The transfer destination path '" << target.GetDstPath() << "' isn`t a table");
83+
return true;
84+
}
85+
7586
if (!AppData()->TransferWriterFactory) {
7687
result.SetError(NKikimrScheme::StatusNotAvailable, "The transfer is only available in the Enterprise version");
7788
return true;
@@ -350,7 +361,7 @@ class TCreateReplication: public TSubOperation {
350361
}
351362
}
352363

353-
if (Strategy->Validate(*result, desc)) {
364+
if (Strategy->Validate(*result, desc, context)) {
354365
return result;
355366
}
356367

@@ -392,10 +403,6 @@ class TCreateReplication: public TSubOperation {
392403
}
393404
}
394405

395-
if (Strategy->Validate(*result.Get(), desc)) {
396-
return result;
397-
}
398-
399406
TString errStr;
400407
if (!context.SS->CheckApplyIf(Transaction, errStr)) {
401408
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);

ydb/core/tx/schemeshard/ut_transfer/ut_transfer.cpp

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
1717
TransferSpecific {
1818
Target {
1919
SrcPath: "/MyRoot1/Table"
20-
DstPath: "/MyRoot2/Table"
20+
DstPath: "/MyRoot/Table"
2121
}
2222
}
2323
}
@@ -36,13 +36,28 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
3636
return r.GetControllerId();
3737
}
3838

39+
void CreateTable(TTestBasicRuntime& runtime, TTestEnv& env, ui64& txId) {
40+
TestCreateColumnTable(runtime, ++txId, "/MyRoot", R"(
41+
Name: "Table"
42+
ColumnShardCount: 1
43+
Schema {
44+
Columns { Name: "key" Type: "Uint32" NotNull: true }
45+
Columns { Name: "data" Type: "Utf8" }
46+
KeyColumnNames: [ "key" ]
47+
}
48+
)");
49+
env.TestWaitNotification(runtime, txId);
50+
}
51+
3952
Y_UNIT_TEST(Create) {
4053
TTestBasicRuntime runtime;
4154
TTestEnv env(runtime, TTestEnvOptions().InitYdbDriver(true).EnableTopicTransfer(true));
4255
ui64 txId = 100;
4356

4457
SetupLogging(runtime);
4558

59+
CreateTable(runtime, env, txId);
60+
4661
TestCreateTransfer(runtime, ++txId, "/MyRoot", DefaultScheme("Transfer"));
4762
env.TestWaitNotification(runtime, txId);
4863

@@ -59,6 +74,8 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
5974

6075
SetupLogging(runtime);
6176

77+
CreateTable(runtime, env, txId);
78+
6279
TestCreateTransfer(runtime, ++txId, "/MyRoot", DefaultScheme("Transfer"),
6380
{NKikimrScheme::StatusInvalidParameter});
6481
env.TestWaitNotification(runtime, txId);
@@ -76,6 +93,8 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
7693
SetupLogging(runtime);
7794
THashSet<ui64> controllerIds;
7895

96+
CreateTable(runtime, env, txId);
97+
7998
for (int i = 0; i < 2; ++i) {
8099
const auto name = Sprintf("Transfer%d", i);
81100

@@ -102,6 +121,8 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
102121
SetupLogging(runtime);
103122
THashSet<ui64> controllerIds;
104123

124+
CreateTable(runtime, env, txId);
125+
105126
for (int i = 0; i < 2; ++i) {
106127
TVector<TString> names;
107128
TVector<ui64> txIds;
@@ -138,6 +159,8 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
138159
SetupLogging(runtime);
139160
ui64 controllerId = 0;
140161

162+
CreateTable(runtime, env, txId);
163+
141164
TestCreateTransfer(runtime, ++txId, "/MyRoot", DefaultScheme("Transfer"));
142165
env.TestWaitNotification(runtime, txId);
143166
{
@@ -166,13 +189,15 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
166189

167190
SetupLogging(runtime);
168191

192+
CreateTable(runtime, env, txId);
193+
169194
TestCreateTransfer(runtime, ++txId, "/MyRoot", R"(
170195
Name: "Transfer"
171196
Config {
172197
TransferSpecific {
173198
Target {
174199
SrcPath: "/MyRoot1/Table"
175-
DstPath: "/MyRoot2/Table"
200+
DstPath: "/MyRoot/Table"
176201
}
177202
}
178203
}
@@ -191,13 +216,15 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
191216

192217
SetupLogging(runtime);
193218

219+
CreateTable(runtime, env, txId);
220+
194221
TestCreateTransfer(runtime, ++txId, "/MyRoot", R"(
195222
Name: "Transfer"
196223
Config {
197224
Specific {
198225
Targets {
199226
SrcPath: "/MyRoot1/Table"
200-
DstPath: "/MyRoot2/Table"
227+
DstPath: "/MyRoot/Table"
201228
}
202229
}
203230
}
@@ -216,13 +243,15 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
216243

217244
SetupLogging(runtime);
218245

246+
CreateTable(runtime, env, txId);
247+
219248
TestCreateTransfer(runtime, ++txId, "/MyRoot", R"(
220249
Name: "Transfer"
221250
Config {
222251
TransferSpecific {
223252
Target {
224253
SrcPath: "/MyRoot1/Table"
225-
DstPath: "/MyRoot2/Table"
254+
DstPath: "/MyRoot/Table"
226255
}
227256
Batching {
228257
BatchSizeBytes: 1073741825
@@ -244,13 +273,15 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
244273

245274
SetupLogging(runtime);
246275

276+
CreateTable(runtime, env, txId);
277+
247278
TestCreateTransfer(runtime, ++txId, "/MyRoot", R"(
248279
Name: "Transfer"
249280
Config {
250281
TransferSpecific {
251282
Target {
252283
SrcPath: "/MyRoot1/Table"
253-
DstPath: "/MyRoot2/Table"
284+
DstPath: "/MyRoot/Table"
254285
}
255286
Batching {
256287
FlushIntervalMilliSeconds: 1
@@ -272,13 +303,15 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
272303

273304
SetupLogging(runtime);
274305

306+
CreateTable(runtime, env, txId);
307+
275308
TestCreateTransfer(runtime, ++txId, "/MyRoot", R"(
276309
Name: "Transfer"
277310
Config {
278311
TransferSpecific {
279312
Target {
280313
SrcPath: "/MyRoot1/Table"
281-
DstPath: "/MyRoot2/Table"
314+
DstPath: "/MyRoot/Table"
282315
}
283316
Batching {
284317
FlushIntervalMilliSeconds: 86400001
@@ -300,13 +333,15 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
300333

301334
SetupLogging(runtime);
302335

336+
CreateTable(runtime, env, txId);
337+
303338
TestCreateTransfer(runtime, ++txId, "/MyRoot", R"(
304339
Name: "Transfer1"
305340
Config {
306341
TransferSpecific {
307342
Target {
308343
SrcPath: "/MyRoot1/Table"
309-
DstPath: "/MyRoot2/Table"
344+
DstPath: "/MyRoot/Table"
310345
}
311346
}
312347
}
@@ -324,7 +359,7 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
324359
TransferSpecific {
325360
Target {
326361
SrcPath: "/MyRoot1/Table"
327-
DstPath: "/MyRoot2/Table"
362+
DstPath: "/MyRoot/Table"
328363
}
329364
}
330365
ConsistencySettings {
@@ -345,7 +380,7 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
345380
TransferSpecific {
346381
Target {
347382
SrcPath: "/MyRoot1/Table"
348-
DstPath: "/MyRoot2/Table"
383+
DstPath: "/MyRoot/Table"
349384
}
350385
}
351386
ConsistencySettings {
@@ -371,6 +406,8 @@ Y_UNIT_TEST_SUITE(TTransferTests) {
371406

372407
SetupLogging(runtime);
373408

409+
CreateTable(runtime, env, txId);
410+
374411
TestCreateTransfer(runtime, ++txId, "/MyRoot", DefaultScheme("Transfer"));
375412
env.TestWaitNotification(runtime, txId);
376413
{

ydb/tests/functional/replication/transfer.cpp

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,50 @@ using namespace NReplicationTest;
44

55
Y_UNIT_TEST_SUITE(Transfer)
66
{
7-
Y_UNIT_TEST(CreateTransfer)
7+
Y_UNIT_TEST(CreateTransfer_EnterpiseVersion)
88
{
99
MainTestCase testCase;
10-
testCase.ExecuteDDL(Sprintf(R"(
10+
testCase.CreateTable(R"(
11+
CREATE TABLE `%s` (
12+
Key Uint64 NOT NULL,
13+
Message Utf8,
14+
PRIMARY KEY (Key)
15+
) WITH (
16+
STORE = COLUMN
17+
);
18+
)");
19+
testCase.CreateTopic();
20+
21+
testCase.CreateTransfer(R"(
22+
$l = ($x) -> {
23+
return [
24+
<|
25+
Key:CAST($x._offset AS Uint64)
26+
|>
27+
];
28+
};
29+
)", MainTestCase::CreateTransferSettings::WithExpectedError("The transfer is only available in the Enterprise version"));
30+
31+
testCase.DropTable();
32+
testCase.DropTopic();
33+
}
34+
35+
Y_UNIT_TEST(CreateTransfer_TargetNotFound)
36+
{
37+
MainTestCase testCase;
38+
testCase.CreateTopic();
39+
40+
testCase.CreateTransfer(R"(
1141
$l = ($x) -> {
1242
return [
1343
<|
1444
Key:CAST($x._offset AS Uint64)
1545
|>
1646
];
1747
};
48+
)", MainTestCase::CreateTransferSettings::WithExpectedError(TStringBuilder() << "The transfer destination path '/local/" << testCase.TableName << "' not found"));
1849

19-
CREATE TRANSFER `%s`
20-
FROM `SourceTopic` TO `TargetTable` USING $l
21-
WITH (
22-
CONNECTION_STRING = 'grpc://localhost'
23-
);
24-
)", testCase.TransferName.data()), true, "The transfer is only available in the Enterprise version");
50+
testCase.DropTopic();
2551
}
2652
}
2753

0 commit comments

Comments
 (0)