Skip to content

Commit d996c04

Browse files
authored
KIKIMR-22120: Take partitioning description of copied table in s3 export (#13067)
flaky test
1 parent 8193406 commit d996c04

File tree

2 files changed

+347
-0
lines changed

2 files changed

+347
-0
lines changed

ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@ void FillSetValForSequences(TSchemeShard* ss, NKikimrSchemeOp::TTableDescription
108108
}
109109
}
110110

111+
void FillPartitioning(TSchemeShard* ss, NKikimrSchemeOp::TTableDescription& desc, const TPathId& exportItemPathId) {
112+
NKikimrSchemeOp::TDescribeOptions opts;
113+
opts.SetReturnPartitionConfig(true);
114+
opts.SetReturnBoundaries(true);
115+
116+
auto copiedPath = DescribePath(ss, TlsActivationContext->AsActorContext(), exportItemPathId, opts);
117+
const auto& copiedTable = copiedPath->GetRecord().GetPathDescription().GetTable();
118+
119+
*desc.MutableSplitBoundary() = copiedTable.GetSplitBoundary();
120+
*desc.MutablePartitionConfig()->MutablePartitioningPolicy() = copiedTable.GetPartitionConfig().GetPartitioningPolicy();
121+
}
122+
111123
THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
112124
TSchemeShard* ss,
113125
TTxId txId,
@@ -137,6 +149,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
137149
if (sourceDescription.HasTable()) {
138150
FillSetValForSequences(
139151
ss, *sourceDescription.MutableTable(), exportItemPath.Base()->PathId);
152+
FillPartitioning(ss, *sourceDescription.MutableTable(), exportItemPath.Base()->PathId);
140153
}
141154
task.MutableTable()->CopyFrom(sourceDescription);
142155
}

ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,340 @@ value {
11541154
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
11551155
}
11561156

1157+
Y_UNIT_TEST(ShouldRestoreTableWithVolatilePartitioningMerge) {
1158+
TPortManager portManager;
1159+
const ui16 port = portManager.GetPort();
1160+
1161+
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
1162+
UNIT_ASSERT(s3Mock.Start());
1163+
1164+
TTestBasicRuntime runtime;
1165+
TTestEnv env(runtime);
1166+
1167+
ui64 txId = 100;
1168+
1169+
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
1170+
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
1171+
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
1172+
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
1173+
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);
1174+
1175+
// Create table with 2 tablets
1176+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
1177+
Name: "Original"
1178+
Columns { Name: "key" Type: "Uint32" }
1179+
Columns { Name: "value" Type: "Utf8" }
1180+
KeyColumnNames: ["key"]
1181+
PartitionConfig {
1182+
PartitioningPolicy {
1183+
MinPartitionsCount: 2
1184+
MaxPartitionsCount: 2
1185+
}
1186+
}
1187+
SplitBoundary {
1188+
KeyPrefix {
1189+
Tuple { Optional { Uint32: 2 } }
1190+
}
1191+
}
1192+
)");
1193+
env.TestWaitNotification(runtime, txId);
1194+
1195+
// Upload data
1196+
const auto firstTablet = TTestTxConfig::FakeHiveTablets;
1197+
const auto secondTablet = TTestTxConfig::FakeHiveTablets + 1;
1198+
UpdateRow(runtime, "Original", 1, "valueA", firstTablet);
1199+
UpdateRow(runtime, "Original", 2, "valueB", secondTablet);
1200+
1201+
// Add delay after copying tables
1202+
bool dropNotification = false;
1203+
THolder<IEventHandle> delayed;
1204+
auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
1205+
switch (ev->GetTypeRewrite()) {
1206+
case TEvSchemeShard::EvModifySchemeTransaction:
1207+
break;
1208+
case TEvSchemeShard::EvNotifyTxCompletionResult:
1209+
if (dropNotification) {
1210+
delayed.Reset(ev.Release());
1211+
return TTestActorRuntime::EEventAction::DROP;
1212+
}
1213+
return TTestActorRuntime::EEventAction::PROCESS;
1214+
default:
1215+
return TTestActorRuntime::EEventAction::PROCESS;
1216+
}
1217+
1218+
const auto* msg = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>();
1219+
if (msg->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
1220+
dropNotification = true;
1221+
}
1222+
1223+
return TTestActorRuntime::EEventAction::PROCESS;
1224+
});
1225+
1226+
// Start exporting table
1227+
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1228+
ExportToS3Settings {
1229+
endpoint: "localhost:%d"
1230+
scheme: HTTP
1231+
items {
1232+
source_path: "/MyRoot/Original"
1233+
destination_prefix: ""
1234+
}
1235+
}
1236+
)", port));
1237+
const ui64 exportId = txId;
1238+
1239+
// Wait for delay after copying tables
1240+
if (!delayed) {
1241+
TDispatchOptions opts;
1242+
opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
1243+
return bool(delayed);
1244+
});
1245+
runtime.DispatchEvents(opts);
1246+
}
1247+
runtime.SetObserverFunc(prevObserver);
1248+
1249+
// Merge 2 tablets in 1 during the delay
1250+
TestAlterTable(runtime, ++txId, "/MyRoot", R"(
1251+
Name: "Original"
1252+
PartitionConfig {
1253+
PartitioningPolicy {
1254+
MinPartitionsCount: 1
1255+
MaxPartitionsCount: 1
1256+
}
1257+
}
1258+
)");
1259+
env.TestWaitNotification(runtime, txId);
1260+
1261+
TestSplitTable(runtime, ++txId, "/MyRoot/Original", Sprintf(R"(
1262+
SourceTabletId: %lu
1263+
SourceTabletId: %lu
1264+
)", firstTablet, secondTablet));
1265+
env.TestWaitNotification(runtime, txId);
1266+
1267+
// Finish the delay and continue exporting
1268+
runtime.Send(delayed.Release(), 0, true);
1269+
env.TestWaitNotification(runtime, exportId);
1270+
1271+
// Check export
1272+
TestGetExport(runtime, exportId, "/MyRoot");
1273+
1274+
// Restore table
1275+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1276+
ImportFromS3Settings {
1277+
endpoint: "localhost:%d"
1278+
scheme: HTTP
1279+
items {
1280+
source_prefix: ""
1281+
destination_path: "/MyRoot/Restored"
1282+
}
1283+
}
1284+
)", port));
1285+
const ui64 importId = txId;
1286+
env.TestWaitNotification(runtime, importId);
1287+
1288+
// Check import
1289+
TestGetImport(runtime, importId, "/MyRoot");
1290+
1291+
// Check partitioning in restored table
1292+
TestDescribeResult(DescribePath(runtime, "/MyRoot/Restored", true, true), {
1293+
NLs::MinPartitionsCountEqual(2),
1294+
NLs::MaxPartitionsCountEqual(2),
1295+
NLs::CheckBoundaries
1296+
});
1297+
1298+
// Check data in restored table
1299+
const auto restoredFirstTablet = TTestTxConfig::FakeHiveTablets + 5;
1300+
const auto restoredSecondTablet = TTestTxConfig::FakeHiveTablets + 6;
1301+
{
1302+
auto expectedJson = TStringBuilder() << "[[[["
1303+
<< "["
1304+
<< R"(["1"];)" // key
1305+
<< R"(["valueA"])" // value
1306+
<< "];"
1307+
<< "];\%false]]]";
1308+
auto content = ReadTable(runtime, restoredFirstTablet, "Restored", {"key", "Uint32", "0"});
1309+
NKqp::CompareYson(expectedJson, content);
1310+
}
1311+
{
1312+
auto expectedJson = TStringBuilder() << "[[[["
1313+
<< "["
1314+
<< R"(["2"];)" // key
1315+
<< R"(["valueB"])" // value
1316+
<< "];"
1317+
<< "];\%false]]]";
1318+
auto content = ReadTable(runtime, restoredSecondTablet, "Restored", {"key", "Uint32", "0"});
1319+
NKqp::CompareYson(expectedJson, content);
1320+
}
1321+
}
1322+
1323+
Y_UNIT_TEST(ShouldRestoreTableWithVolatilePartitioningSplit) {
1324+
TPortManager portManager;
1325+
const ui16 port = portManager.GetPort();
1326+
1327+
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
1328+
UNIT_ASSERT(s3Mock.Start());
1329+
1330+
TTestBasicRuntime runtime;
1331+
TTestEnv env(runtime);
1332+
1333+
ui64 txId = 100;
1334+
1335+
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
1336+
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
1337+
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
1338+
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
1339+
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);
1340+
1341+
// Create table with 2 tablets
1342+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
1343+
Name: "Original"
1344+
Columns { Name: "key" Type: "Uint32" }
1345+
Columns { Name: "value" Type: "Utf8" }
1346+
KeyColumnNames: ["key"]
1347+
PartitionConfig {
1348+
PartitioningPolicy {
1349+
MinPartitionsCount: 2
1350+
MaxPartitionsCount: 2
1351+
}
1352+
}
1353+
SplitBoundary {
1354+
KeyPrefix {
1355+
Tuple { Optional { Uint32: 3 } }
1356+
}
1357+
}
1358+
)");
1359+
env.TestWaitNotification(runtime, txId);
1360+
1361+
// Upload data
1362+
const auto firstTablet = TTestTxConfig::FakeHiveTablets;
1363+
UpdateRow(runtime, "Original", 1, "valueA", firstTablet);
1364+
UpdateRow(runtime, "Original", 2, "valueB", firstTablet);
1365+
1366+
// Add delay after copying tables
1367+
bool dropNotification = false;
1368+
THolder<IEventHandle> delayed;
1369+
auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
1370+
switch (ev->GetTypeRewrite()) {
1371+
case TEvSchemeShard::EvModifySchemeTransaction:
1372+
break;
1373+
case TEvSchemeShard::EvNotifyTxCompletionResult:
1374+
if (dropNotification) {
1375+
delayed.Reset(ev.Release());
1376+
return TTestActorRuntime::EEventAction::DROP;
1377+
}
1378+
return TTestActorRuntime::EEventAction::PROCESS;
1379+
default:
1380+
return TTestActorRuntime::EEventAction::PROCESS;
1381+
}
1382+
1383+
const auto* msg = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>();
1384+
if (msg->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
1385+
dropNotification = true;
1386+
}
1387+
1388+
return TTestActorRuntime::EEventAction::PROCESS;
1389+
});
1390+
1391+
// Start exporting table
1392+
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1393+
ExportToS3Settings {
1394+
endpoint: "localhost:%d"
1395+
scheme: HTTP
1396+
items {
1397+
source_path: "/MyRoot/Original"
1398+
destination_prefix: ""
1399+
}
1400+
}
1401+
)", port));
1402+
const ui64 exportId = txId;
1403+
1404+
// Wait for delay after copying tables
1405+
if (!delayed) {
1406+
TDispatchOptions opts;
1407+
opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool {
1408+
return bool(delayed);
1409+
});
1410+
runtime.DispatchEvents(opts);
1411+
}
1412+
runtime.SetObserverFunc(prevObserver);
1413+
1414+
// Split 2 tablets in 3 during the delay
1415+
TestAlterTable(runtime, ++txId, "/MyRoot", R"(
1416+
Name: "Original"
1417+
PartitionConfig {
1418+
PartitioningPolicy {
1419+
MinPartitionsCount: 3
1420+
MaxPartitionsCount: 3
1421+
}
1422+
}
1423+
)");
1424+
env.TestWaitNotification(runtime, txId);
1425+
1426+
TestSplitTable(runtime, ++txId, "/MyRoot/Original", Sprintf(R"(
1427+
SourceTabletId: %lu
1428+
SplitBoundary {
1429+
KeyPrefix {
1430+
Tuple { Optional { Uint32: 2 } }
1431+
}
1432+
}
1433+
)", firstTablet));
1434+
env.TestWaitNotification(runtime, txId);
1435+
1436+
// Finish the delay and continue exporting
1437+
runtime.Send(delayed.Release(), 0, true);
1438+
env.TestWaitNotification(runtime, exportId);
1439+
1440+
// Check export
1441+
TestGetExport(runtime, exportId, "/MyRoot");
1442+
1443+
// Restore table
1444+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1445+
ImportFromS3Settings {
1446+
endpoint: "localhost:%d"
1447+
scheme: HTTP
1448+
items {
1449+
source_prefix: ""
1450+
destination_path: "/MyRoot/Restored"
1451+
}
1452+
}
1453+
)", port));
1454+
const ui64 importId = txId;
1455+
env.TestWaitNotification(runtime, importId);
1456+
1457+
// Check import
1458+
TestGetImport(runtime, importId, "/MyRoot");
1459+
1460+
// Check partitioning in restored table
1461+
TestDescribeResult(DescribePath(runtime, "/MyRoot/Restored", true, true), {
1462+
NLs::MinPartitionsCountEqual(2),
1463+
NLs::MaxPartitionsCountEqual(2),
1464+
NLs::CheckBoundaries
1465+
});
1466+
1467+
// Check data in restored table
1468+
const auto restoredFirstTablet = TTestTxConfig::FakeHiveTablets + 6;
1469+
const auto restoredSecondTablet = TTestTxConfig::FakeHiveTablets + 7;
1470+
{
1471+
auto expectedJson = TStringBuilder() << "[[[["
1472+
<< "["
1473+
<< R"(["1"];)" // key
1474+
<< R"(["valueA"])" // value
1475+
<< "];"
1476+
<< "["
1477+
<< R"(["2"];)" // key
1478+
<< R"(["valueB"])" // value
1479+
<< "];"
1480+
<< "];\%false]]]";
1481+
auto content = ReadTable(runtime, restoredFirstTablet, "Restored", {"key", "Uint32", "0"});
1482+
NKqp::CompareYson(expectedJson, content);
1483+
}
1484+
{
1485+
auto expectedJson = "[[[[];\%false]]]";
1486+
auto content = ReadTable(runtime, restoredSecondTablet, "Restored", {"key", "Uint32", "0"});
1487+
NKqp::CompareYson(expectedJson, content);
1488+
}
1489+
}
1490+
11571491
Y_UNIT_TEST(ExportImportOnSupportedDatatypes) {
11581492
TTestBasicRuntime runtime;
11591493
TTestEnv env(runtime, TTestEnvOptions().EnableParameterizedDecimal(true));

0 commit comments

Comments
 (0)