Skip to content

Commit 9c294d8

Browse files
authored
Support MoveSequence in schemeshard (#9772)
1 parent 7818abc commit 9c294d8

21 files changed

+1367
-7
lines changed

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,102 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
597597
TouchIndexAfterMoveTable(true);
598598
}
599599

600+
Y_UNIT_TEST(MoveTableWithSerialTypes) {
601+
TKikimrRunner kikimr;
602+
auto db = kikimr.GetTableClient();
603+
auto session = db.CreateSession().GetValueSync().GetSession();
604+
TString tableName = "/Root/TableWithSerial";
605+
TString newTableName = "/Root/TableWithSerialMoved";
606+
{
607+
auto query = TStringBuilder() << R"(
608+
CREATE TABLE `)" << tableName << R"(` (
609+
Key Serial,
610+
Value Int32,
611+
PRIMARY KEY (Key)
612+
);)";
613+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
614+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
615+
}
616+
{
617+
auto queryUpsert = TStringBuilder() << R"(
618+
INSERT INTO `)" << tableName << R"(` (Value) VALUES (1), (2), (3);
619+
)";
620+
auto result = session.ExecuteDataQuery(queryUpsert, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
621+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
622+
}
623+
{
624+
auto querySelect = TStringBuilder() << R"(
625+
SELECT * FROM `)" << tableName << R"(`;
626+
)";
627+
auto result = session.ExecuteDataQuery(querySelect, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
628+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
629+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
630+
CompareYson(R"([
631+
[1;[1]];[2;[2]];[3;[3]]
632+
])", FormatResultSetYson(result.GetResultSet(0)));
633+
}
634+
{
635+
TDescribeTableResult describe = session.DescribeTable(tableName, TDescribeTableSettings().WithSetVal(true)).GetValueSync();
636+
UNIT_ASSERT_EQUAL(describe.GetStatus(), EStatus::SUCCESS);
637+
const auto& tableDescription = describe.GetTableDescription();
638+
bool hasSerial = false;
639+
for (const auto& column : tableDescription.GetTableColumns()) {
640+
if (column.Name == "Key") {
641+
UNIT_ASSERT(column.SequenceDescription.has_value());
642+
hasSerial = true;
643+
break;
644+
}
645+
}
646+
UNIT_ASSERT(hasSerial);
647+
}
648+
{
649+
auto query = TStringBuilder() << R"(
650+
ALTER TABLE `)" << tableName << R"(` RENAME TO `)" << newTableName << R"(`;
651+
)";
652+
653+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
654+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
655+
656+
auto describeResult = session.DescribeTable(tableName).GetValueSync();
657+
UNIT_ASSERT(!describeResult.IsSuccess());
658+
}
659+
{
660+
TDescribeTableResult describe = session.DescribeTable(newTableName, TDescribeTableSettings().WithSetVal(true)).GetValueSync();
661+
UNIT_ASSERT_EQUAL(describe.GetStatus(), EStatus::SUCCESS);
662+
const auto& tableDescription = describe.GetTableDescription();
663+
bool hasSerial = false;
664+
for (const auto& column : tableDescription.GetTableColumns()) {
665+
if (column.Name == "Key") {
666+
UNIT_ASSERT(column.SequenceDescription.has_value());
667+
UNIT_ASSERT(column.SequenceDescription->SetVal.has_value());
668+
UNIT_ASSERT_VALUES_EQUAL(column.SequenceDescription->SetVal->NextValue, 4);
669+
UNIT_ASSERT_VALUES_EQUAL(column.SequenceDescription->SetVal->NextUsed, false);
670+
hasSerial = true;
671+
break;
672+
}
673+
}
674+
UNIT_ASSERT(hasSerial);
675+
}
676+
{
677+
auto queryUpsert = TStringBuilder() << R"(
678+
INSERT INTO `)" << newTableName << R"(` (Value) VALUES (4), (5), (6);
679+
)";
680+
auto result = session.ExecuteDataQuery(queryUpsert, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
681+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
682+
}
683+
{
684+
auto querySelect = TStringBuilder() << R"(
685+
SELECT * FROM `)" << newTableName << R"(`;
686+
)";
687+
auto result = session.ExecuteDataQuery(querySelect, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
688+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
689+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
690+
CompareYson(R"([
691+
[1;[1]];[2;[2]];[3;[3]];[4;[4]];[5;[5]];[6;[6]]
692+
])", FormatResultSetYson(result.GetResultSet(0)));
693+
}
694+
}
695+
600696
void CheckInvalidationAfterDropCreateTable(bool withCompatSchema) {
601697
TKikimrRunner kikimr;
602698

ydb/core/protos/counters_schemeshard.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,13 @@ enum ESimpleCounters {
222222
COUNTER_IN_FLIGHT_OPS_TxAlterResourcePool = 176 [(CounterOpts) = {Name: "InFlightOps/AlterResourcePool"}];
223223

224224
COUNTER_IN_FLIGHT_OPS_TxRestoreIncrementalBackupAtTable = 177 [(CounterOpts) = {Name: "InFlightOps/RestoreIncrementalBackupAtTable"}];
225+
225226
COUNTER_BACKUP_COLLECTION_COUNT = 178 [(CounterOpts) = {Name: "BackupCollectionCount"}];
226227
COUNTER_IN_FLIGHT_OPS_TxCreateBackupCollection = 179 [(CounterOpts) = {Name: "InFlightOps/CreateBackupCollection"}];
227228
COUNTER_IN_FLIGHT_OPS_TxAlterBackupCollection = 180 [(CounterOpts) = {Name: "InFlightOps/AlterBackupCollection"}];
228229
COUNTER_IN_FLIGHT_OPS_TxDropBackupCollection = 181 [(CounterOpts) = {Name: "InFlightOps/DropBackupCollection"}];
230+
231+
COUNTER_IN_FLIGHT_OPS_TxMoveSequence = 182 [(CounterOpts) = {Name: "InFlightOps/MoveSequence"}];
229232
}
230233

231234
enum ECumulativeCounters {
@@ -363,6 +366,8 @@ enum ECumulativeCounters {
363366
COUNTER_FINISHED_OPS_TxCreateBackupCollection = 107 [(CounterOpts) = {Name: "FinishedOps/CreateBackupCollection"}];
364367
COUNTER_FINISHED_OPS_TxAlterBackupCollection = 108 [(CounterOpts) = {Name: "FinishedOps/AlterBackupCollection"}];
365368
COUNTER_FINISHED_OPS_TxDropBackupCollection = 109 [(CounterOpts) = {Name: "FinishedOps/DropBackupCollection"}];
369+
370+
COUNTER_FINISHED_OPS_TxMoveSequence = 110 [(CounterOpts) = {Name: "FinishedOps/TxMoveSequence"}];
366371
}
367372

368373
enum EPercentileCounters {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1617,6 +1617,9 @@ enum EOperationType {
16171617
ESchemeOpCreateBackupCollection = 105;
16181618
ESchemeOpAlterBackupCollection = 106;
16191619
ESchemeOpDropBackupCollection = 107;
1620+
1621+
// Move sequence
1622+
ESchemeOpMoveSequence = 108;
16201623
}
16211624

16221625
message TApplyIf {
@@ -1804,6 +1807,8 @@ message TModifyScheme {
18041807
optional TBackupCollectionDescription CreateBackupCollection = 74;
18051808
optional TBackupCollectionDescription AlterBackupCollection = 75;
18061809
optional TBackupCollectionDescription DropBackupCollection = 76;
1810+
1811+
optional TMove MoveSequence = 77;
18071812
}
18081813

18091814
message TCopySequence {

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3463,7 +3463,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
34633463
srcPath->DbRefCount++;
34643464
}
34653465

3466-
if (txState.TxType == TTxState::TxMoveTable || txState.TxType == TTxState::TxMoveTableIndex) {
3466+
if (txState.TxType == TTxState::TxMoveTable || txState.TxType == TTxState::TxMoveTableIndex || txState.TxType == TTxState::TxMoveSequence) {
34673467
Y_ABORT_UNLESS(txState.SourcePathId);
34683468
TPathElement::TPtr srcPath = Self->PathsById.at(txState.SourcePathId);
34693469
Y_VERIFY_S(srcPath, "Null path element, pathId: " << txState.SourcePathId);

ydb/core/tx/schemeshard/schemeshard__operation.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,6 +1141,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
11411141
return CreateDropSequence(NextPartId(), txState);
11421142
case TTxState::ETxType::TxCopySequence:
11431143
return CreateCopySequence(NextPartId(), txState);
1144+
case TTxState::ETxType::TxMoveSequence:
1145+
return CreateMoveSequence(NextPartId(), txState);
11441146

11451147
case TTxState::ETxType::TxFillIndex:
11461148
Y_ABORT("deprecated");
@@ -1406,6 +1408,8 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx
14061408
return {CreateMoveTableIndex(NextPartId(), tx)};
14071409
case NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex:
14081410
return CreateConsistentMoveIndex(NextPartId(), tx, context);
1411+
case NKikimrSchemeOp::EOperationType::ESchemeOpMoveSequence:
1412+
return {CreateMoveSequence(NextPartId(), tx)};
14091413

14101414
// Replication
14111415
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateReplication:

ydb/core/tx/schemeshard/schemeshard__operation_db_changes.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ void TStorageChanges::Apply(TSchemeShard* ss, NTabletFlatExecutor::TTransactionC
2626
ss->PersistTableIndexAlterData(db, pId);
2727
}
2828

29+
for (const auto& pId : Sequences) {
30+
ss->PersistSequence(db, pId);
31+
}
32+
33+
for (const auto& pId : AlterSequences) {
34+
ss->PersistSequenceAlter(db, pId);
35+
}
36+
2937
for (const auto& pId : ApplyIndexes) {
3038
ss->PersistTableIndex(db, pId);
3139
}

ydb/core/tx/schemeshard/schemeshard__operation_db_changes.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ class TStorageChanges: public TSimpleRefCount<TStorageChanges> {
3737

3838
TDeque<TPathId> Views;
3939

40+
TDeque<TPathId> Sequences;
41+
TDeque<TPathId> AlterSequences;
42+
4043
//PQ part
4144
TDeque<std::tuple<TPathId, TShardIdx, TTopicTabletInfo::TTopicPartitionInfo>> PersQueue;
4245
TDeque<std::pair<TPathId, TTopicInfo::TPtr>> PersQueueGroup;
@@ -117,6 +120,14 @@ class TStorageChanges: public TSimpleRefCount<TStorageChanges> {
117120
Views.emplace_back(pathId);
118121
}
119122

123+
void PersistAlterSequence(const TPathId& pathId) {
124+
AlterSequences.push_back(pathId);
125+
}
126+
127+
void PersistSequence(const TPathId& pathId) {
128+
Sequences.push_back(pathId);
129+
}
130+
120131
void Apply(TSchemeShard* ss, NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx);
121132
};
122133

ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ void TMemoryChanges::GrabIndex(TSchemeShard* ss, const TPathId& pathId) {
6464
Grab<TTableIndexInfo>(pathId, ss->Indexes, Indexes);
6565
}
6666

67+
void TMemoryChanges::GrabNewSequence(TSchemeShard* ss, const TPathId& pathId) {
68+
GrabNew(pathId, ss->Sequences, Sequences);
69+
}
70+
71+
void TMemoryChanges::GrabSequence(TSchemeShard* ss, const TPathId& pathId) {
72+
Grab<TSequenceInfo>(pathId, ss->Sequences, Sequences);
73+
}
74+
6775
void TMemoryChanges::GrabNewCdcStream(TSchemeShard* ss, const TPathId& pathId) {
6876
GrabNew(pathId, ss->CdcStreams, CdcStreams);
6977
}
@@ -136,6 +144,16 @@ void TMemoryChanges::UnDo(TSchemeShard* ss) {
136144
Indexes.pop();
137145
}
138146

147+
while (Sequences) {
148+
const auto& [id, elem] = Sequences.top();
149+
if (elem) {
150+
ss->Sequences[id] = elem;
151+
} else {
152+
ss->Sequences.erase(id);
153+
}
154+
Sequences.pop();
155+
}
156+
139157
while (CdcStreams) {
140158
const auto& [id, elem] = CdcStreams.top();
141159
if (elem) {

ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ class TMemoryChanges: public TSimpleRefCount<TMemoryChanges> {
3030
using TTableState = std::pair<TPathId, TTableInfo::TPtr>;
3131
TStack<TTableState> Tables;
3232

33+
using TSequenceState = std::pair<TPathId, TSequenceInfo::TPtr>;
34+
TStack<TSequenceState> Sequences;
35+
3336
using TShardState = std::pair<TShardIdx, THolder<TShardInfo>>;
3437
TStack<TShardState> Shards;
3538

@@ -77,6 +80,9 @@ class TMemoryChanges: public TSimpleRefCount<TMemoryChanges> {
7780
void GrabNewIndex(TSchemeShard* ss, const TPathId& pathId);
7881
void GrabIndex(TSchemeShard* ss, const TPathId& pathId);
7982

83+
void GrabNewSequence(TSchemeShard* ss, const TPathId& pathId);
84+
void GrabSequence(TSchemeShard* ss, const TPathId& pathId);
85+
8086
void GrabNewCdcStream(TSchemeShard* ss, const TPathId& pathId);
8187
void GrabCdcStream(TSchemeShard* ss, const TPathId& pathId);
8288

0 commit comments

Comments
 (0)