Skip to content

Commit 91cc642

Browse files
authored
break locks on scheme tx (#11517)
1 parent a431c94 commit 91cc642

9 files changed

+70
-10
lines changed

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2351,9 +2351,19 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
23512351
Value2 String,
23522352
PRIMARY KEY (Key)
23532353
);
2354+
)", TTxControl::NoTx()).ExtractValueSync();
2355+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2356+
2357+
result = db.ExecuteQuery(R"(
23542358
UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "1");
23552359
SELECT * FROM TestDdlDml2;
23562360
ALTER TABLE TestDdlDml2 DROP COLUMN Value2;
2361+
)", TTxControl::NoTx()).ExtractValueSync();
2362+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
2363+
2364+
result = db.ExecuteQuery(R"(
2365+
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (1, "1");
2366+
SELECT * FROM TestDdlDml2;
23572367
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2");
23582368
SELECT * FROM TestDdlDml2;
23592369
CREATE TABLE TestDdlDml33 (
@@ -2363,7 +2373,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
23632373
)", TTxControl::NoTx()).ExtractValueSync();
23642374
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
23652375
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
2366-
CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
2376+
CompareYson(R"([[[1u];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
23672377
CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1)));
23682378
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
23692379

ydb/core/tx/datashard/alter_table_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -151,7 +152,8 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op,
151152
}
152153

153154
TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx);
154-
DataShard.AddUserTable(tableId, info);
155+
TDataShardLocksDb locksDb(DataShard, txc);
156+
DataShard.AddUserTable(tableId, info, &locksDb);
155157

156158
if (info->NeedSchemaSnapshots()) {
157159
DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/create_cdc_stream_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -43,7 +44,8 @@ class TCreateCdcStreamUnit : public TExecutionUnit {
4344
Y_ABORT_UNLESS(version);
4445

4546
auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc);
46-
DataShard.AddUserTable(pathId, tableInfo);
47+
TDataShardLocksDb locksDb(DataShard, txc);
48+
DataShard.AddUserTable(pathId, tableInfo, &locksDb);
4749

4850
if (tableInfo->NeedSchemaSnapshots()) {
4951
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/datashard.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,7 +1890,6 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
18901890
newTableInfo->StatsNeedUpdate = true;
18911891

18921892
TDataShardLocksDb locksDb(*this, txc);
1893-
18941893
RemoveUserTable(prevId, &locksDb);
18951894
AddUserTable(newId, newTableInfo);
18961895

@@ -1967,8 +1966,8 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD
19671966
}
19681967

19691968
newTableInfo->SetSchema(schema);
1970-
1971-
AddUserTable(pathId, newTableInfo);
1969+
TDataShardLocksDb locksDb(*this, txc);
1970+
AddUserTable(pathId, newTableInfo, &locksDb);
19721971

19731972
if (newTableInfo->NeedSchemaSnapshots()) {
19741973
AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1632,7 +1632,10 @@ class TDataShard
16321632
TableInfos.erase(tableId.LocalPathId);
16331633
}
16341634

1635-
void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) {
1635+
void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo, ILocksDb* locksDb = nullptr) {
1636+
if (locksDb) {
1637+
SysLocks.RemoveSchema(tableId, locksDb);
1638+
}
16361639
TableInfos[tableId.LocalPathId] = tableInfo;
16371640
SysLocks.UpdateSchema(tableId, tableInfo->KeyColumnTypes);
16381641
Pipeline.GetDepTracker().UpdateSchema(tableId, *tableInfo);

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3868,6 +3868,44 @@ Y_UNIT_TEST_SUITE(Cdc) {
38683868
MustNotLoseSchemaSnapshot(true);
38693869
}
38703870

3871+
Y_UNIT_TEST(ShouldBreakLocksOnConcurrentSchemeTx) {
3872+
TPortManager portManager;
3873+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3874+
.SetUseRealThreads(false)
3875+
.SetDomainName("Root")
3876+
);
3877+
3878+
auto& runtime = *server->GetRuntime();
3879+
const auto edgeActor = runtime.AllocateEdgeActor();
3880+
3881+
SetupLogging(runtime);
3882+
InitRoot(server, edgeActor);
3883+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
3884+
3885+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3886+
Updates(NKikimrSchemeOp::ECdcStreamFormatJson)));
3887+
3888+
ExecSQL(server, edgeActor, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);");
3889+
3890+
TString sessionId;
3891+
TString txId;
3892+
KqpSimpleBegin(runtime, sessionId, txId, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 11);");
3893+
3894+
UNIT_ASSERT_VALUES_EQUAL(
3895+
KqpSimpleContinue(runtime, sessionId, txId, "SELECT key, value FROM `/Root/Table`;"),
3896+
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");
3897+
3898+
WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table"));
3899+
3900+
UNIT_ASSERT_VALUES_EQUAL(
3901+
KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;"),
3902+
"ERROR: ABORTED");
3903+
3904+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3905+
R"({"update":{"value":10},"key":[1]})",
3906+
});
3907+
}
3908+
38713909
Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) {
38723910
TPortManager portManager;
38733911
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

ydb/core/tx/datashard/drop_cdc_stream_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -40,7 +41,8 @@ class TDropCdcStreamUnit : public TExecutionUnit {
4041
Y_ABORT_UNLESS(version);
4142

4243
auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId);
43-
DataShard.AddUserTable(pathId, tableInfo);
44+
TDataShardLocksDb locksDb(DataShard, txc);
45+
DataShard.AddUserTable(pathId, tableInfo, &locksDb);
4446

4547
if (tableInfo->NeedSchemaSnapshots()) {
4648
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/drop_index_notice_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -52,7 +53,8 @@ class TDropIndexNoticeUnit : public TExecutionUnit {
5253
}
5354

5455
Y_ABORT_UNLESS(tableInfo);
55-
DataShard.AddUserTable(pathId, tableInfo);
56+
TDataShardLocksDb locksDb(DataShard, txc);
57+
DataShard.AddUserTable(pathId, tableInfo, &locksDb);
5658

5759
if (tableInfo->NeedSchemaSnapshots()) {
5860
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/initiate_build_index_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "datashard_impl.h"
2+
#include "datashard_locks_db.h"
23
#include "datashard_pipeline.h"
34
#include "execution_unit_ctors.h"
45

@@ -53,7 +54,8 @@ class TInitiateBuildIndexUnit : public TExecutionUnit {
5354
}
5455

5556
Y_ABORT_UNLESS(tableInfo);
56-
DataShard.AddUserTable(pathId, tableInfo);
57+
TDataShardLocksDb locksDb(DataShard, txc);
58+
DataShard.AddUserTable(pathId, tableInfo, &locksDb);
5759

5860
if (tableInfo->NeedSchemaSnapshots()) {
5961
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

0 commit comments

Comments
 (0)