Skip to content

Commit 9dda4cb

Browse files
Import changefeed's configuration from s3 (#13943)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
1 parent bc40145 commit 9dda4cb

File tree

21 files changed

+806
-17
lines changed

21 files changed

+806
-17
lines changed

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,5 @@ message TFeatureFlags {
193193
// deny non-administrators the privilege of administering local users and groups
194194
optional bool EnableStrictUserManagement = 168 [default = false];
195195
optional bool EnableDatabaseAdmin = 169 [default = false];
196+
optional bool EnableChangefeedsImport = 170 [default = false];
196197
}

ydb/core/protos/flat_scheme_op.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import "ydb/library/mkql_proto/protos/minikql.proto";
2323
import "ydb/public/api/protos/ydb_coordination.proto";
2424
import "ydb/public/api/protos/ydb_export.proto";
2525
import "ydb/public/api/protos/ydb_table.proto";
26+
import "ydb/public/api/protos/ydb_topic.proto";
2627
import "ydb/public/api/protos/ydb_value.proto";
2728

2829
import "google/protobuf/empty.proto";
@@ -2124,3 +2125,11 @@ message TBackupBackupCollection {
21242125
optional NKikimrProto.TPathID PathId = 2;
21252126
optional string TargetDir = 3; // must be set on Rewrite
21262127
}
2128+
2129+
message TImportTableChangefeeds {
2130+
message TImportChangefeedTopic {
2131+
optional Ydb.Table.ChangefeedDescription Changefeed = 1;
2132+
optional Ydb.Topic.DescribeTopicResult Topic = 2;
2133+
}
2134+
repeated TImportChangefeedTopic Changefeeds = 1;
2135+
}

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4467,9 +4467,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
44674467
item.Metadata = NBackup::TMetadata::Deserialize(rowset.GetValue<Schema::ImportItems::Metadata>());
44684468
}
44694469

4470+
if (rowset.HaveValue<Schema::ImportItems::Changefeeds>()) {
4471+
item.Changefeeds = rowset.GetValue<Schema::ImportItems::Changefeeds>();
4472+
}
4473+
44704474
item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());
44714475
item.WaitTxId = rowset.GetValueOrDefault<Schema::ImportItems::WaitTxId>(InvalidTxId);
44724476
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);
4477+
item.NextChangefeedIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextChangefeedIdx>(0);
44734478
item.Issue = rowset.GetValueOrDefault<Schema::ImportItems::Issue>(TString());
44744479

44754480
if (item.WaitTxId != InvalidTxId) {

ydb/core/tx/schemeshard/schemeshard_import.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI
7373
case TImportInfo::EState::BuildIndexes:
7474
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_BUILD_INDEXES);
7575
break;
76+
case TImportInfo::EState::CreateChangefeed:
77+
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_CREATE_CHANGEFEEDS);
78+
break;
7679
case TImportInfo::EState::Done:
7780
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_DONE);
7881
break;
@@ -163,6 +166,7 @@ void TSchemeShard::PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo
163166
NIceDb::TUpdate<Schema::ImportItems::State>(static_cast<ui8>(item.State)),
164167
NIceDb::TUpdate<Schema::ImportItems::WaitTxId>(item.WaitTxId),
165168
NIceDb::TUpdate<Schema::ImportItems::NextIndexIdx>(item.NextIndexIdx),
169+
NIceDb::TUpdate<Schema::ImportItems::NextChangefeedIdx>(item.NextChangefeedIdx),
166170
NIceDb::TUpdate<Schema::ImportItems::Issue>(item.Issue)
167171
);
168172
}
@@ -189,6 +193,10 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
189193
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
190194
NIceDb::TUpdate<Schema::ImportItems::Metadata>(item.Metadata.Serialize())
191195
);
196+
197+
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
198+
NIceDb::TUpdate<Schema::ImportItems::Changefeeds>(item.Changefeeds)
199+
);
192200
}
193201

194202
void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {

ydb/core/tx/schemeshard/schemeshard_import__create.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
526526
return true;
527527
}
528528

529+
void CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
530+
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
531+
auto& item = importInfo->Items.at(itemIdx);
532+
item.SubState = ESubState::Proposed;
533+
534+
LOG_I("TImport::TTxProgress: CreateChangefeed propose"
535+
<< ": info# " << importInfo->ToString()
536+
<< ", item# " << item.ToString(itemIdx)
537+
<< ", txId# " << txId);
538+
539+
Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
540+
541+
Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
542+
}
543+
529544
void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
530545
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
531546
auto& item = importInfo->Items.at(itemIdx);
@@ -588,6 +603,25 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
588603
return TTxId(ui64((*infoPtr)->Id));
589604
}
590605

606+
TTxId GetActiveCreateChangefeedTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
607+
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
608+
const auto& item = importInfo->Items.at(itemIdx);
609+
610+
Y_ABORT_UNLESS(item.State == EState::CreateChangefeed);
611+
Y_ABORT_UNLESS(item.DstPathId);
612+
613+
if (!Self->PathsById.contains(item.DstPathId)) {
614+
return InvalidTxId;
615+
}
616+
617+
auto path = Self->PathsById.at(item.DstPathId);
618+
if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
619+
return InvalidTxId;
620+
}
621+
622+
return path->LastTxId;
623+
}
624+
591625
static TString MakeIndexBuildUid(TImportInfo::TPtr importInfo, ui32 itemIdx) {
592626
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
593627
const auto& item = importInfo->Items.at(itemIdx);
@@ -756,6 +790,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
756790
case EState::CreateSchemeObject:
757791
case EState::Transferring:
758792
case EState::BuildIndexes:
793+
case EState::CreateChangefeed:
759794
if (item.WaitTxId == InvalidTxId) {
760795
if (!IsCreatedByQuery(item) || item.PreparedCreationQuery) {
761796
AllocateTxId(importInfo, itemIdx);
@@ -781,6 +816,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
781816
TTxId txId = InvalidTxId;
782817

783818
switch (item.State) {
819+
case EState::CreateChangefeed:
820+
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
821+
break;
822+
784823
case EState::Transferring:
785824
if (!CancelTransferring(importInfo, itemIdx)) {
786825
txId = GetActiveRestoreTxId(importInfo, itemIdx);
@@ -1004,6 +1043,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10041043
BuildIndex(importInfo, i, txId);
10051044
itemIdx = i;
10061045
break;
1046+
1047+
case EState::CreateChangefeed:
1048+
CreateChangefeed(importInfo, i, txId);
1049+
itemIdx = i;
1050+
break;
10071051

10081052
default:
10091053
break;
@@ -1064,6 +1108,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10641108
txId = TTxId(record.GetPathCreateTxId());
10651109
} else if (item.State == EState::Transferring) {
10661110
txId = GetActiveRestoreTxId(importInfo, itemIdx);
1111+
} else if (item.State == EState::CreateChangefeed) {
1112+
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
10671113
}
10681114
}
10691115

@@ -1216,6 +1262,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
12161262
if (item.NextIndexIdx < item.Scheme.indexes_size()) {
12171263
item.State = EState::BuildIndexes;
12181264
AllocateTxId(importInfo, itemIdx);
1265+
} else if (item.NextChangefeedIdx < item.Changefeeds.changefeeds_size() &&
1266+
AppData()->FeatureFlags.GetEnableChangefeedsImport()) {
1267+
item.State = EState::CreateChangefeed;
1268+
AllocateTxId(importInfo, itemIdx);
12191269
} else {
12201270
item.State = EState::Done;
12211271
}
@@ -1229,11 +1279,23 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
12291279
} else {
12301280
if (++item.NextIndexIdx < item.Scheme.indexes_size()) {
12311281
AllocateTxId(importInfo, itemIdx);
1282+
} else if (item.NextChangefeedIdx < item.Changefeeds.changefeeds_size() &&
1283+
AppData()->FeatureFlags.GetEnableChangefeedsImport()) {
1284+
item.State = EState::CreateChangefeed;
1285+
AllocateTxId(importInfo, itemIdx);
12321286
} else {
12331287
item.State = EState::Done;
12341288
}
12351289
}
12361290
break;
1291+
1292+
case EState::CreateChangefeed:
1293+
if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
1294+
AllocateTxId(importInfo, itemIdx);
1295+
} else {
1296+
item.State = EState::Done;
1297+
}
1298+
break;
12371299

12381300
default:
12391301
return SendNotificationsIfFinished(importInfo);

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,5 +231,65 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
231231
return MakeHolder<TEvIndexBuilder::TEvCancelRequest>(ui64(indexBuildId), domainPath.PathString(), ui64(indexBuildId));
232232
}
233233

234+
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
235+
TSchemeShard* ss,
236+
TTxId txId,
237+
const TImportInfo::TItem& item
238+
) {
239+
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());
240+
241+
const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx];
242+
const auto& changefeed = importChangefeedTopic.GetChangefeed();
243+
const auto& topic = importChangefeedTopic.GetTopic();
244+
245+
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
246+
auto& record = propose->Record;
247+
auto& modifyScheme = *record.AddTransaction();
248+
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream);
249+
auto& cdcStream = *modifyScheme.MutableCreateCdcStream();
250+
251+
const TPath dstPath = TPath::Init(item.DstPathId, ss);
252+
modifyScheme.SetWorkingDir(dstPath.Parent().PathString());
253+
cdcStream.SetTableName(dstPath.LeafName());
254+
255+
TString error;
256+
Ydb::StatusIds::StatusCode status;
257+
258+
auto& cdcStreamDescription = *cdcStream.MutableStreamDescription();
259+
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
260+
return nullptr;
261+
}
262+
263+
if (topic.has_retention_period()) {
264+
cdcStream.SetRetentionPeriodSeconds(topic.retention_period().seconds());
265+
}
266+
267+
if (topic.has_partitioning_settings()) {
268+
i64 minActivePartitions =
269+
topic.partitioning_settings().min_active_partitions();
270+
if (minActivePartitions < 0) {
271+
return nullptr;
272+
} else if (minActivePartitions == 0) {
273+
minActivePartitions = 1;
274+
}
275+
cdcStream.SetTopicPartitions(minActivePartitions);
276+
277+
if (topic.partitioning_settings().has_auto_partitioning_settings()) {
278+
auto& partitioningSettings = topic.partitioning_settings().auto_partitioning_settings();
279+
cdcStream.SetTopicAutoPartitioning(partitioningSettings.strategy() != ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
280+
281+
i64 maxActivePartitions =
282+
topic.partitioning_settings().max_active_partitions();
283+
if (maxActivePartitions < 0) {
284+
return nullptr;
285+
} else if (maxActivePartitions == 0) {
286+
maxActivePartitions = 50;
287+
}
288+
cdcStream.SetMaxPartitionCount(maxActivePartitions);
289+
}
290+
}
291+
return propose;
292+
}
293+
234294
} // NSchemeShard
235295
} // NKikimr

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,11 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
4646
TTxId indexBuildId
4747
);
4848

49+
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
50+
TSchemeShard* ss,
51+
TTxId txId,
52+
const TImportInfo::TItem& item
53+
);
54+
4955
} // NSchemeShard
5056
} // NKikimr

0 commit comments

Comments
 (0)