Skip to content

Commit 01a6323

Browse files
committed
Add more tests (#19378)
1 parent 6229b6c commit 01a6323

File tree

3 files changed

+99
-3
lines changed

3 files changed

+99
-3
lines changed

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3894,7 +3894,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
38943894
MustNotLoseSchemaSnapshot(true);
38953895
}
38963896

3897-
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentSchemeTx) {
3897+
template <typename TPrepareFunc, typename TTestFunc>
3898+
void ShouldBreakLocksOnConcurrentSchemeTx(TPrepareFunc prepare, TTestFunc test, Ydb::StatusIds::StatusCode finalCode = Ydb::StatusIds::ABORTED) {
38983899
TPortManager portManager;
38993900
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
39003901
.SetUseRealThreads(false)
@@ -3911,6 +3912,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
39113912
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
39123913
Updates(NKikimrSchemeOp::ECdcStreamFormatJson)));
39133914

3915+
prepare(server, edgeActor);
3916+
39143917
ExecSQL(server, edgeActor, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);");
39153918

39163919
TString sessionId;
@@ -3921,17 +3924,89 @@ Y_UNIT_TEST_SUITE(Cdc) {
39213924
KqpSimpleContinue(runtime, sessionId, txId, "SELECT key, value FROM `/Root/Table`;"),
39223925
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");
39233926

3924-
WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table"));
3927+
test(server, edgeActor);
39253928

39263929
UNIT_ASSERT_VALUES_EQUAL(
39273930
KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;"),
3928-
"ERROR: ABORTED");
3931+
Sprintf("ERROR: %s", Ydb::StatusIds::StatusCode_Name(finalCode).c_str()));
39293932

39303933
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
39313934
R"({"update":{"value":10},"key":[1]})",
39323935
});
39333936
}
39343937

3938+
void Nop(TServer::TPtr, const TActorId&) {
3939+
}
3940+
3941+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentAlterTable) {
3942+
ShouldBreakLocksOnConcurrentSchemeTx(&Nop, [](TServer::TPtr server, const TActorId& edgeActor) {
3943+
WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table"));
3944+
});
3945+
}
3946+
3947+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentMoveTable) {
3948+
// nop: "Cannot move table with cdc streams"
3949+
}
3950+
3951+
void AddIndex(TServer::TPtr server, const TActorId& edgeActor) {
3952+
WaitTxNotification(server, edgeActor, AsyncAlterAddIndex(server, "/Root", "/Root/Table",
3953+
TShardedTableOptions::TIndex{"Index", {"value"}}));
3954+
}
3955+
3956+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentAddIndex) {
3957+
ShouldBreakLocksOnConcurrentSchemeTx(&Nop, &AddIndex);
3958+
}
3959+
3960+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentMoveIndex) {
3961+
ShouldBreakLocksOnConcurrentSchemeTx(&AddIndex, [](TServer::TPtr server, const TActorId& edgeActor) {
3962+
WaitTxNotification(server, edgeActor, AsyncMoveIndex(server, "/Root/Table", "Index", "MovedIndex"));
3963+
});
3964+
}
3965+
3966+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentDropIndex) {
3967+
ShouldBreakLocksOnConcurrentSchemeTx(&AddIndex, [](TServer::TPtr server, const TActorId& edgeActor) {
3968+
WaitTxNotification(server, edgeActor, AsyncAlterDropIndex(server, "/Root", "Table", "Index"));
3969+
}, Ydb::StatusIds::UNAVAILABLE);
3970+
}
3971+
3972+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentCancelBuildIndex) {
3973+
ui64 buildIndexId = 0;
3974+
auto addIndexWithBlock = [&buildIndexId](TServer::TPtr server, const TActorId&) {
3975+
auto& runtime = *server->GetRuntime();
3976+
TBlockEvents<TEvDataShard::TEvBuildIndexProgressResponse> blockProgress(runtime);
3977+
buildIndexId = AsyncAlterAddIndex(server, "/Root", "/Root/Table", TShardedTableOptions::TIndex{"Index", {"value"}});
3978+
runtime.WaitFor("Progress", [&]{ return blockProgress.size(); });
3979+
blockProgress.Stop();
3980+
};
3981+
auto cancelBuildIndex = [&buildIndexId](TServer::TPtr server, const TActorId& edgeActor) {
3982+
UNIT_ASSERT(buildIndexId != 0);
3983+
CancelAddIndex(server, "/Root", buildIndexId);
3984+
WaitTxNotification(server, edgeActor, buildIndexId);
3985+
};
3986+
ShouldBreakLocksOnConcurrentSchemeTx(addIndexWithBlock, cancelBuildIndex, Ydb::StatusIds::UNAVAILABLE);
3987+
}
3988+
3989+
void AddStream(TServer::TPtr server, const TActorId& edgeActor) {
3990+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3991+
Updates(NKikimrSchemeOp::ECdcStreamFormatJson, "Stream2")));
3992+
}
3993+
3994+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentAddStream) {
3995+
ShouldBreakLocksOnConcurrentSchemeTx(&Nop, &AddStream);
3996+
}
3997+
3998+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentAlterStream) {
3999+
ShouldBreakLocksOnConcurrentSchemeTx(&AddStream, [](TServer::TPtr server, const TActorId& edgeActor) {
4000+
WaitTxNotification(server, edgeActor, AsyncAlterDisableStream(server, "/Root", "Table", "Stream2"));
4001+
});
4002+
}
4003+
4004+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentDropStream) {
4005+
ShouldBreakLocksOnConcurrentSchemeTx(&AddStream, [](TServer::TPtr server, const TActorId& edgeActor) {
4006+
WaitTxNotification(server, edgeActor, AsyncAlterDropStream(server, "/Root", "Table", "Stream2"));
4007+
});
4008+
}
4009+
39354010
Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) {
39364011
TPortManager portManager;
39374012
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1782,6 +1782,21 @@ void CancelAddIndex(Tests::TServer::TPtr server, const TString& dbName, ui64 bui
17821782
UNIT_ASSERT_EQUAL(resp->Get()->Record.GetStatus(), Ydb::StatusIds::SUCCESS);
17831783
}
17841784

1785+
ui64 AsyncMoveIndex(
1786+
Tests::TServer::TPtr server,
1787+
const TString& tablePath,
1788+
const TString& srcIndexName,
1789+
const TString& dstIndexName)
1790+
{
1791+
auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpMoveIndex);
1792+
auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableMoveIndex();
1793+
desc.SetTablePath(tablePath);
1794+
desc.SetSrcPath(srcIndexName);
1795+
desc.SetDstPath(dstIndexName);
1796+
1797+
return RunSchemeTx(*server->GetRuntime(), std::move(request));
1798+
}
1799+
17851800
ui64 AsyncAlterDropIndex(
17861801
Tests::TServer::TPtr server,
17871802
const TString& workingDir,

ydb/core/tx/datashard/ut_common/datashard_ut_common.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,12 @@ void CancelAddIndex(
670670
const TString& dbName,
671671
ui64 buildIndexId);
672672

673+
ui64 AsyncMoveIndex(
674+
Tests::TServer::TPtr server,
675+
const TString& tablePath,
676+
const TString& srcIndexName,
677+
const TString& dstIndexName);
678+
673679
ui64 AsyncAlterDropIndex(
674680
Tests::TServer::TPtr server,
675681
const TString& workingDir,

0 commit comments

Comments
 (0)