Skip to content

Commit abe4d5b

Browse files
authored
Replicate index tables (#6937)
1 parent e30c4cd commit abe4d5b

21 files changed

+418
-137
lines changed

ydb/core/base/path.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,12 @@ inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TStr
3737
return path;
3838
}
3939

40+
inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TVector<TString>& childPath) {
41+
auto path = parentPath;
42+
for (const auto& childName : childPath) {
43+
path.push_back(childName);
44+
}
45+
return path;
46+
}
47+
4048
}

ydb/core/tx/replication/controller/dst_alterer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {
4141

4242
switch (Kind) {
4343
case TReplication::ETargetKind::Table:
44+
case TReplication::ETargetKind::IndexTable:
4445
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
4546
PathIdFromPathId(DstPathId, tx.MutableAlterTable()->MutablePathId());
4647
tx.MutableAlterTable()->MutableReplicationConfig()->SetMode(

ydb/core/tx/replication/controller/dst_creator.cpp

Lines changed: 117 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <ydb/core/cms/console/configs_dispatcher.h>
99
#include <ydb/core/protos/console_config.pb.h>
1010
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
11+
#include <ydb/core/tx/scheme_board/events.h>
12+
#include <ydb/core/tx/scheme_board/subscriber.h>
1113
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
1214
#include <ydb/core/tx/schemeshard/schemeshard.h>
1315
#include <ydb/core/tx/tx_proxy/proxy.h>
@@ -116,6 +118,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
116118
.WithKeyShardBoundary(true)));
117119
}
118120
break;
121+
case TReplication::ETargetKind::IndexTable:
122+
Y_ABORT("unreachable");
119123
}
120124
}
121125

@@ -128,7 +132,7 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
128132
}
129133
}
130134

131-
NKikimrScheme::EStatus ConvertStatus(NYdb::EStatus status) {
135+
static NKikimrScheme::EStatus ConvertStatus(NYdb::EStatus status) {
132136
switch (status) {
133137
case NYdb::EStatus::SUCCESS:
134138
return NKikimrScheme::StatusSuccess;
@@ -165,8 +169,20 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
165169

166170
Ydb::Table::CreateTableRequest scheme;
167171
result.GetTableDescription().SerializeTo(scheme);
168-
// Disable index support until other replicator code be ready to process index replication
169-
scheme.mutable_indexes()->Clear();
172+
173+
// filter out unsupported index types
174+
auto& indexes = *scheme.mutable_indexes();
175+
for (auto it = indexes.begin(); it != indexes.end();) {
176+
switch (it->type_case()) {
177+
case Ydb::Table::TableIndex::kGlobalIndex:
178+
case Ydb::Table::TableIndex::kGlobalUniqueIndex:
179+
++it;
180+
continue;
181+
default:
182+
it = indexes.erase(it);
183+
break;
184+
}
185+
}
170186

171187
Ydb::StatusIds::StatusCode status;
172188
TString error;
@@ -182,30 +198,37 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
182198

183199
TxBody.SetWorkingDir(pathPair.first);
184200

185-
NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
201+
NKikimrSchemeOp::TTableDescription* desc = nullptr;
186202
if (scheme.indexes_size()) {
203+
NeedToCheck = true;
187204
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
188-
tableDesc = TxBody.MutableCreateIndexedTable()->MutableTableDescription();
189205
TxBody.SetInternal(true);
206+
desc = TxBody.MutableCreateIndexedTable()->MutableTableDescription();
207+
if (!FillIndexDescription(*TxBody.MutableCreateIndexedTable(), scheme, status, error)) {
208+
return Error(NKikimrScheme::StatusSchemeError, error);
209+
}
190210
} else {
191211
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
192-
tableDesc = TxBody.MutableCreateTable();
212+
desc = TxBody.MutableCreateTable();
193213
}
194214

195-
Ydb::StatusIds::StatusCode dummyCode;
215+
Y_ABORT_UNLESS(desc);
216+
desc->SetName(pathPair.second);
196217

197-
if (!FillIndexDescription(*TxBody.MutableCreateIndexedTable(), scheme, dummyCode, error)) {
198-
return Error(NKikimrScheme::StatusSchemeError, error);
218+
FillReplicationConfig(*desc->MutableReplicationConfig());
219+
if (scheme.indexes_size()) {
220+
for (auto& index : *TxBody.MutableCreateIndexedTable()->MutableIndexDescription()) {
221+
FillReplicationConfig(*index.MutableIndexImplTableDescriptions(0)->MutableReplicationConfig());
222+
}
199223
}
200224

201-
tableDesc->SetName(pathPair.second);
225+
AllocateTxId();
226+
}
202227

228+
static void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) {
203229
// TODO: support other modes
204-
auto& replicationConfig = *tableDesc->MutableReplicationConfig();
205230
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
206231
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
207-
208-
AllocateTxId();
209232
}
210233

211234
void AllocateTxId() {
@@ -257,7 +280,9 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
257280

258281
switch (record.GetStatus()) {
259282
case NKikimrScheme::StatusAccepted:
260-
DstPathId = TPathId(SchemeShardId, record.GetPathId());
283+
if (!NeedToCheck) {
284+
DstPathId = TPathId(SchemeShardId, record.GetPathId());
285+
}
261286
Y_DEBUG_ABORT_UNLESS(TxId == record.GetTxId());
262287
return SubscribeTx(record.GetTxId());
263288
case NKikimrScheme::StatusMultipleModifications:
@@ -338,6 +363,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
338363
switch (Kind) {
339364
case TReplication::ETargetKind::Table:
340365
return CheckTableScheme(desc.GetTable(), error);
366+
case TReplication::ETargetKind::IndexTable:
367+
Y_ABORT("unreachable");
341368
}
342369
}
343370

@@ -366,21 +393,30 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
366393
return false;
367394
}
368395

369-
const auto& expected = TxBody.GetCreateTable();
396+
const NKikimrSchemeOp::TIndexedTableCreationConfig* indexedDesc = nullptr;
397+
const NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
398+
if (TxBody.GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexedTable) {
399+
indexedDesc = &TxBody.GetCreateIndexedTable();
400+
tableDesc = &indexedDesc->GetTableDescription();
401+
} else {
402+
tableDesc = &TxBody.GetCreateTable();
403+
}
404+
405+
Y_ABORT_UNLESS(tableDesc);
370406

371407
// check key
372-
if (expected.KeyColumnNamesSize() != got.KeyColumnNamesSize()) {
408+
if (tableDesc->KeyColumnNamesSize() != got.KeyColumnNamesSize()) {
373409
error = TStringBuilder() << "Key columns size mismatch"
374-
<< ": expected: " << expected.KeyColumnNamesSize()
410+
<< ": expected: " << tableDesc->KeyColumnNamesSize()
375411
<< ", got: " << got.KeyColumnNamesSize();
376412
return false;
377413
}
378414

379-
for (ui32 i = 0; i < expected.KeyColumnNamesSize(); ++i) {
380-
if (expected.GetKeyColumnNames(i) != got.GetKeyColumnNames(i)) {
415+
for (ui32 i = 0; i < tableDesc->KeyColumnNamesSize(); ++i) {
416+
if (tableDesc->GetKeyColumnNames(i) != got.GetKeyColumnNames(i)) {
381417
error = TStringBuilder() << "Key column name mismatch"
382418
<< ": position: " << i
383-
<< ", expected: " << expected.GetKeyColumnNames(i)
419+
<< ", expected: " << tableDesc->GetKeyColumnNames(i)
384420
<< ", got: " << got.GetKeyColumnNames(i);
385421
return false;
386422
}
@@ -392,14 +428,14 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
392428
columns.emplace(column.GetName(), column.GetType());
393429
}
394430

395-
if (expected.ColumnsSize() != columns.size()) {
431+
if (tableDesc->ColumnsSize() != columns.size()) {
396432
error = TStringBuilder() << "Columns size mismatch"
397-
<< ": expected: " << expected.ColumnsSize()
433+
<< ": expected: " << tableDesc->ColumnsSize()
398434
<< ", got: " << columns.size();
399435
return false;
400436
}
401437

402-
for (const auto& column : expected.GetColumns()) {
438+
for (const auto& column : tableDesc->GetColumns()) {
403439
auto it = columns.find(column.GetName());
404440
if (it == columns.end()) {
405441
error = TStringBuilder() << "Cannot find column"
@@ -422,14 +458,25 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
422458
indexes.emplace(index.GetName(), &index);
423459
}
424460

425-
if (expected.TableIndexesSize() != indexes.size()) {
461+
if (!indexedDesc) {
462+
if (!indexes.empty()) {
463+
error = TStringBuilder() << "Indexes size mismatch"
464+
<< ": expected: " << 0
465+
<< ", got: " << indexes.size();
466+
return false;
467+
}
468+
469+
return true;
470+
}
471+
472+
if (indexedDesc->IndexDescriptionSize() != indexes.size()) {
426473
error = TStringBuilder() << "Indexes size mismatch"
427-
<< ": expected: " << expected.TableIndexesSize()
474+
<< ": expected: " << indexedDesc->IndexDescriptionSize()
428475
<< ", got: " << indexes.size();
429476
return false;
430477
}
431478

432-
for (const auto& index : expected.GetTableIndexes()) {
479+
for (const auto& index : indexedDesc->GetIndexDescription()) {
433480
auto it = indexes.find(index.GetName());
434481
if (it == indexes.end()) {
435482
error = TStringBuilder() << "Cannot find index"
@@ -487,6 +534,36 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
487534
return true;
488535
}
489536

537+
void SubscribeDstPath() {
538+
Subscriber = Register(CreateSchemeBoardSubscriber(SelfId(), DstPath));
539+
Become(&TThis::StateSubscribeDstPath);
540+
}
541+
542+
STATEFN(StateSubscribeDstPath) {
543+
switch (ev->GetTypeRewrite()) {
544+
hFunc(TSchemeBoardEvents::TEvNotifyUpdate, Handle);
545+
default:
546+
return StateBase(ev);
547+
}
548+
}
549+
550+
void Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev) {
551+
LOG_T("Handle " << ev->Get()->ToString());
552+
553+
const auto& desc = ev->Get()->DescribeSchemeResult;
554+
if (desc.GetStatus() != NKikimrScheme::StatusSuccess) {
555+
return;
556+
}
557+
558+
const auto& entryDesc = desc.GetPathDescription().GetSelf();
559+
if (!entryDesc.HasCreateFinished() || !entryDesc.GetCreateFinished()) {
560+
return;
561+
}
562+
563+
DstPathId = ev->Get()->PathId;
564+
return Success();
565+
}
566+
490567
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
491568
LOG_T("Handle " << ev->Get()->ToString());
492569

@@ -525,6 +602,12 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
525602
Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup);
526603
}
527604

605+
void PassAway() override {
606+
if (const auto& actorId = std::exchange(Subscriber, {})) {
607+
Send(actorId, new TEvents::TEvPoison());
608+
}
609+
}
610+
528611
public:
529612
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
530613
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_DST_CREATOR;
@@ -554,7 +637,13 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
554637
}
555638

556639
void Bootstrap() {
557-
Resolve(PathId);
640+
switch (Kind) {
641+
case TReplication::ETargetKind::Table:
642+
return Resolve(PathId);
643+
case TReplication::ETargetKind::IndexTable:
644+
// indexed table will be created along with its indexes
645+
return SubscribeDstPath();
646+
}
558647
}
559648

560649
STATEFN(StateBase) {
@@ -586,6 +675,7 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
586675
TActorId PipeCache;
587676
bool NeedToCheck = false;
588677
TPathId DstPathId;
678+
TActorId Subscriber;
589679

590680
}; // TDstCreator
591681

ydb/core/tx/replication/controller/dst_creator_ut.cpp

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
6262
CheckTableReplica(tableDesc, replicatedDesc);
6363
}
6464

65-
void WithSyncIndex(const TString& replicatedPath) {
65+
Y_UNIT_TEST(Basic) {
66+
Basic("/Root/Replicated");
67+
}
68+
69+
Y_UNIT_TEST(WithIntermediateDir) {
70+
Basic("/Root/Dir/Replicated");
71+
}
72+
73+
void WithIndex(const TString& replicatedPath, NKikimrSchemeOp::EIndexType indexType) {
6674
TEnv env;
6775
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
6876

@@ -79,25 +87,45 @@ Y_UNIT_TEST_SUITE(DstCreator) {
7987
const TString indexName = "index_by_value";
8088

8189
env.CreateTableWithIndex("/Root", *MakeTableDescription(tableDesc),
82-
indexName, TVector<TString>{"value"}, NKikimrSchemeOp::EIndexTypeGlobal,
83-
TVector<TString>{}, TDuration::Seconds(5000));
90+
indexName, TVector<TString>{"value"}, indexType);
8491
env.GetRuntime().Register(CreateDstCreator(
8592
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
8693
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath
8794
));
88-
89-
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
90-
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
95+
{
96+
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
97+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
98+
}
9199

92100
auto desc = env.GetDescription(replicatedPath);
93101
const auto& replicatedDesc = desc.GetPathDescription().GetTable();
94102

95103
CheckTableReplica(tableDesc, replicatedDesc);
96104

105+
switch (indexType) {
106+
case NKikimrSchemeOp::EIndexTypeGlobal:
107+
case NKikimrSchemeOp::EIndexTypeGlobalUnique:
108+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.TableIndexesSize(), 1);
109+
break;
110+
default:
111+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.TableIndexesSize(), 0);
112+
return;
113+
}
114+
115+
env.GetRuntime().Register(CreateDstCreator(
116+
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
117+
1 /* rid */, 2 /* tid */, TReplication::ETargetKind::IndexTable,
118+
"/Root/Table/" + indexName + "/indexImplTable", replicatedPath + "/" + indexName + "/indexImplTable"
119+
));
120+
{
121+
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
122+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
123+
}
124+
97125
{
98126
auto desc = env.GetDescription(replicatedPath + "/" + indexName);
99127
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetName(), indexName);
100-
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetType(), NKikimrSchemeOp::EIndexType::EIndexTypeGlobal);
128+
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetType(), indexType);
101129
}
102130

103131
{
@@ -106,25 +134,19 @@ Y_UNIT_TEST_SUITE(DstCreator) {
106134
const auto& indexTableDesc = desc.GetPathDescription().GetTable();
107135
UNIT_ASSERT_VALUES_EQUAL(indexTableDesc.KeyColumnNamesSize(), 2);
108136
}
109-
}
110-
111-
112-
Y_UNIT_TEST(Basic) {
113-
Basic("/Root/Replicated");
114137
}
115138

116-
Y_UNIT_TEST(WithIntermediateDir) {
117-
Basic("/Root/Dir/Replicated");
118-
}
119-
/*
120139
Y_UNIT_TEST(WithSyncIndex) {
121-
WithSyncIndex("/Root/Replicated");
140+
WithIndex("/Root/Replicated", NKikimrSchemeOp::EIndexTypeGlobal);
141+
}
142+
143+
Y_UNIT_TEST(WithSyncIndexAndIntermediateDir) {
144+
WithIndex("/Root/Dir/Replicated", NKikimrSchemeOp::EIndexTypeGlobal);
122145
}
123146

124-
Y_UNIT_TEST(WithSyncIndexWithIntermediateDir) {
125-
WithSyncIndex("/Root/Dir/Replicated");
147+
Y_UNIT_TEST(WithAsyncIndex) {
148+
WithIndex("/Root/Replicated", NKikimrSchemeOp::EIndexTypeGlobalAsync);
126149
}
127-
*/
128150

129151
Y_UNIT_TEST(SameOwner) {
130152
TEnv env;

0 commit comments

Comments
 (0)