diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 6739bea08e17..b944b5eec598 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -193,4 +193,5 @@ message TFeatureFlags { // deny non-administrators the privilege of administering local users and groups optional bool EnableStrictUserManagement = 168 [default = false]; optional bool EnableDatabaseAdmin = 169 [default = false]; + optional bool EnableChangefeedsImport = 170 [default = false]; } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 1b41719b37be..5b6d3e55fbc9 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -23,6 +23,7 @@ import "ydb/library/mkql_proto/protos/minikql.proto"; import "ydb/public/api/protos/ydb_coordination.proto"; import "ydb/public/api/protos/ydb_export.proto"; import "ydb/public/api/protos/ydb_table.proto"; +import "ydb/public/api/protos/ydb_topic.proto"; import "ydb/public/api/protos/ydb_value.proto"; import "google/protobuf/empty.proto"; @@ -2124,3 +2125,11 @@ message TBackupBackupCollection { optional NKikimrProto.TPathID PathId = 2; optional string TargetDir = 3; // must be set on Rewrite } + +message TImportTableChangefeeds { + message TImportChangefeedTopic { + optional Ydb.Table.ChangefeedDescription Changefeed = 1; + optional Ydb.Topic.DescribeTopicResult Topic = 2; + } + repeated TImportChangefeedTopic Changefeeds = 1; +} diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index a6f26ad50798..aaff7cd5bbc4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4464,9 +4464,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase { item.Metadata = NBackup::TMetadata::Deserialize(rowset.GetValue()); } + if (rowset.HaveValue()) { + item.Changefeeds = rowset.GetValue(); + } + item.State = static_cast(rowset.GetValue()); item.WaitTxId = rowset.GetValueOrDefault(InvalidTxId); item.NextIndexIdx = rowset.GetValueOrDefault(0); + item.NextChangefeedIdx = rowset.GetValueOrDefault(0); item.Issue = rowset.GetValueOrDefault(TString()); if (item.WaitTxId != InvalidTxId) { diff --git a/ydb/core/tx/schemeshard/schemeshard_import.cpp b/ydb/core/tx/schemeshard/schemeshard_import.cpp index 04f51430691a..05a002eee590 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import.cpp @@ -73,6 +73,9 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI case TImportInfo::EState::BuildIndexes: import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_BUILD_INDEXES); break; + case TImportInfo::EState::CreateChangefeed: + import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_CREATE_CHANGEFEEDS); + break; case TImportInfo::EState::Done: import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_DONE); break; @@ -163,6 +166,7 @@ void TSchemeShard::PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo NIceDb::TUpdate(static_cast(item.State)), NIceDb::TUpdate(item.WaitTxId), NIceDb::TUpdate(item.NextIndexIdx), + NIceDb::TUpdate(item.NextChangefeedIdx), NIceDb::TUpdate(item.Issue) ); } @@ -189,6 +193,10 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf db.Table().Key(importInfo->Id, itemIdx).Update( NIceDb::TUpdate(item.Metadata.Serialize()) ); + + db.Table().Key(importInfo->Id, itemIdx).Update( + NIceDb::TUpdate(item.Changefeeds) + ); } void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) { diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index a90bd50a0503..e21149edb9f8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -526,6 +526,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase return true; } + void CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + auto& item = importInfo->Items.at(itemIdx); + item.SubState = ESubState::Proposed; + + LOG_I("TImport::TTxProgress: CreateChangefeed propose" + << ": info# " << importInfo->ToString() + << ", item# " << item.ToString(itemIdx) + << ", txId# " << txId); + + Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); + + Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item)); + } + void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); auto& item = importInfo->Items.at(itemIdx); @@ -588,6 +603,25 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase return TTxId(ui64((*infoPtr)->Id)); } + TTxId GetActiveCreateChangefeedTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + const auto& item = importInfo->Items.at(itemIdx); + + Y_ABORT_UNLESS(item.State == EState::CreateChangefeed); + Y_ABORT_UNLESS(item.DstPathId); + + if (!Self->PathsById.contains(item.DstPathId)) { + return InvalidTxId; + } + + auto path = Self->PathsById.at(item.DstPathId); + if (path->PathState != NKikimrSchemeOp::EPathStateAlter) { + return InvalidTxId; + } + + return path->LastTxId; + } + static TString MakeIndexBuildUid(TImportInfo::TPtr importInfo, ui32 itemIdx) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); const auto& item = importInfo->Items.at(itemIdx); @@ -756,6 +790,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase case EState::CreateSchemeObject: case EState::Transferring: case EState::BuildIndexes: + case EState::CreateChangefeed: if (item.WaitTxId == InvalidTxId) { if (!IsCreatedByQuery(item) || item.PreparedCreationQuery) { AllocateTxId(importInfo, itemIdx); @@ -781,6 +816,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase TTxId txId = InvalidTxId; switch (item.State) { + case EState::CreateChangefeed: + txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx); + break; + case EState::Transferring: if (!CancelTransferring(importInfo, itemIdx)) { txId = GetActiveRestoreTxId(importInfo, itemIdx); @@ -1004,6 +1043,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase BuildIndex(importInfo, i, txId); itemIdx = i; break; + + case EState::CreateChangefeed: + CreateChangefeed(importInfo, i, txId); + itemIdx = i; + break; default: break; @@ -1064,6 +1108,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase txId = TTxId(record.GetPathCreateTxId()); } else if (item.State == EState::Transferring) { txId = GetActiveRestoreTxId(importInfo, itemIdx); + } else if (item.State == EState::CreateChangefeed) { + txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx); } } @@ -1216,6 +1262,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (item.NextIndexIdx < item.Scheme.indexes_size()) { item.State = EState::BuildIndexes; AllocateTxId(importInfo, itemIdx); + } else if (item.NextChangefeedIdx < item.Changefeeds.changefeeds_size() && + AppData()->FeatureFlags.GetEnableChangefeedsImport()) { + item.State = EState::CreateChangefeed; + AllocateTxId(importInfo, itemIdx); } else { item.State = EState::Done; } @@ -1229,11 +1279,23 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } else { if (++item.NextIndexIdx < item.Scheme.indexes_size()) { AllocateTxId(importInfo, itemIdx); + } else if (item.NextChangefeedIdx < item.Changefeeds.changefeeds_size() && + AppData()->FeatureFlags.GetEnableChangefeedsImport()) { + item.State = EState::CreateChangefeed; + AllocateTxId(importInfo, itemIdx); } else { item.State = EState::Done; } } break; + + case EState::CreateChangefeed: + if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) { + AllocateTxId(importInfo, itemIdx); + } else { + item.State = EState::Done; + } + break; default: return SendNotificationsIfFinished(importInfo); diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp index b8fd66be70c6..30b63428d47f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp @@ -231,5 +231,65 @@ THolder CancelIndexBuildPropose( return MakeHolder(ui64(indexBuildId), domainPath.PathString(), ui64(indexBuildId)); } +THolder CreateChangefeedPropose( + TSchemeShard* ss, + TTxId txId, + const TImportInfo::TItem& item +) { + Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()); + + const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx]; + const auto& changefeed = importChangefeedTopic.GetChangefeed(); + const auto& topic = importChangefeedTopic.GetTopic(); + + auto propose = MakeHolder(ui64(txId), ss->TabletID()); + auto& record = propose->Record; + auto& modifyScheme = *record.AddTransaction(); + modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream); + auto& cdcStream = *modifyScheme.MutableCreateCdcStream(); + + const TPath dstPath = TPath::Init(item.DstPathId, ss); + modifyScheme.SetWorkingDir(dstPath.Parent().PathString()); + cdcStream.SetTableName(dstPath.LeafName()); + + TString error; + Ydb::StatusIds::StatusCode status; + + auto& cdcStreamDescription = *cdcStream.MutableStreamDescription(); + if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) { + return nullptr; + } + + if (topic.has_retention_period()) { + cdcStream.SetRetentionPeriodSeconds(topic.retention_period().seconds()); + } + + if (topic.has_partitioning_settings()) { + i64 minActivePartitions = + topic.partitioning_settings().min_active_partitions(); + if (minActivePartitions < 0) { + return nullptr; + } else if (minActivePartitions == 0) { + minActivePartitions = 1; + } + cdcStream.SetTopicPartitions(minActivePartitions); + + if (topic.partitioning_settings().has_auto_partitioning_settings()) { + auto& partitioningSettings = topic.partitioning_settings().auto_partitioning_settings(); + cdcStream.SetTopicAutoPartitioning(partitioningSettings.strategy() != ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); + + i64 maxActivePartitions = + topic.partitioning_settings().max_active_partitions(); + if (maxActivePartitions < 0) { + return nullptr; + } else if (maxActivePartitions == 0) { + maxActivePartitions = 50; + } + cdcStream.SetMaxPartitionCount(maxActivePartitions); + } + } + return propose; +} + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h index a99221db35b4..26ac6e2825cd 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h @@ -46,5 +46,11 @@ THolder CancelIndexBuildPropose( TTxId indexBuildId ); +THolder CreateChangefeedPropose( + TSchemeShard* ss, + TTxId txId, + const TImportInfo::TItem& item +); + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp index 84b740ad9947..1a311fb5262c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp @@ -51,6 +51,14 @@ class TSchemeGetter: public TActorBootstrapped { return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY; } + static TString ChangefeedDescriptionKey(const TString& changefeedPrefix) { + return TStringBuilder() << changefeedPrefix << "/changefeed_description.pb"; + } + + static TString TopicDescriptionKey(const TString& changefeedPrefix) { + return TStringBuilder() << changefeedPrefix << "/topic_description.pb"; + } + void HeadObject(const TString& key) { auto request = Model::HeadObjectRequest() .WithKey(key); @@ -103,7 +111,7 @@ class TSchemeGetter: public TActorBootstrapped { << ", result# " << result); if (NoObjectFound(result.GetError().GetErrorType())) { - Reply(); // permissions are optional + StartDownloadingChangefeeds(); // permissions are optional return; } else if (!CheckResult(result, "HeadObject")) { return; @@ -128,6 +136,38 @@ class TSchemeGetter: public TActorBootstrapped { GetObject(ChecksumKey, std::make_pair(0, contentLength - 1)); } + void HandleChangefeed(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandleChangefeed TEvExternalStorage::TEvHeadObjectResponse" + << ": self# " << SelfId() + << ", result# " << result); + + if (!CheckResult(result, "HeadObject")) { + return; + } + + const auto contentLength = result.GetResult().GetContentLength(); + Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsKeys.size()); + GetObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1)); + } + + void HandleTopic(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandleTopic TEvExternalStorage::TEvHeadObjectResponse" + << ": self# " << SelfId() + << ", result# " << result); + + if (!CheckResult(result, "HeadObject")) { + return; + } + + const auto contentLength = result.GetResult().GetContentLength(); + Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsKeys.size()); + GetObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1)); + } + void GetObject(const TString& key, const std::pair& range) { auto request = Model::GetObjectRequest() .WithKey(key) @@ -205,7 +245,7 @@ class TSchemeGetter: public TActorBootstrapped { if (NeedDownloadPermissions) { StartDownloadingPermissions(); } else { - Reply(); + StartDownloadingChangefeeds(); } }; @@ -242,7 +282,7 @@ class TSchemeGetter: public TActorBootstrapped { item.Permissions = std::move(permissions); auto nextStep = [this]() { - Reply(); + StartDownloadingChangefeeds(); }; if (NeedValidateChecksums) { @@ -274,6 +314,130 @@ class TSchemeGetter: public TActorBootstrapped { ChecksumValidatedCallback(); } + void HandleChangefeed(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { + const auto& msg = *ev->Get(); + const auto& result = msg.Result; + + LOG_D("HandleChangefeed TEvExternalStorage::TEvGetObjectResponse" + << ": self# " << SelfId() + << ", result# " << result); + + if (!CheckResult(result, "GetObject")) { + return; + } + + Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size()); + auto& item = ImportInfo->Items.at(ItemIdx); + + LOG_T("Trying to parse changefeed" + << ": self# " << SelfId() + << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); + + Ydb::Table::ChangefeedDescription changefeed; + if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &changefeed)) { + return Reply(false, "Cannot parse сhangefeed"); + } + + *item.Changefeeds.MutableChangefeeds(IndexDownloadedChangefeed)->MutableChangefeed() = std::move(changefeed); + + auto nextStep = [this]() { + Become(&TThis::StateDownloadTopics); + HeadObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed])); + }; + + if (NeedValidateChecksums) { + StartValidatingChecksum(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep); + } else { + nextStep(); + } + } + + void HandleTopic(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { + const auto& msg = *ev->Get(); + const auto& result = msg.Result; + + LOG_D("HandleTopic TEvExternalStorage::TEvGetObjectResponse" + << ": self# " << SelfId() + << ", result# " << result); + + if (!CheckResult(result, "GetObject")) { + return; + } + + Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size()); + auto& item = ImportInfo->Items.at(ItemIdx); + + LOG_T("Trying to parse topic" + << ": self# " << SelfId() + << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); + + Ydb::Topic::DescribeTopicResult topic; + if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &topic)) { + return Reply(false, "Cannot parse topic"); + } + *item.Changefeeds.MutableChangefeeds(IndexDownloadedChangefeed)->MutableTopic() = std::move(topic); + + auto nextStep = [this]() { + if (++IndexDownloadedChangefeed >= ChangefeedsKeys.size()) { + Reply(); + } else { + Become(&TThis::StateDownloadChangefeeds); + HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed])); + } + }; + + if (NeedValidateChecksums) { + StartValidatingChecksum(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep); + } else { + nextStep(); + } + } + + void ListObjects(const TString& prefix) { + auto request = Model::ListObjectsRequest() + .WithPrefix(prefix); + + Send(Client, new TEvExternalStorage::TEvListObjectsRequest(request)); + } + + template + static void Resize(::google::protobuf::RepeatedPtrField* repeatedField, ui64 size) { + while (size--) repeatedField->Add(); + } + + void HandleChangefeeds(TEvExternalStorage::TEvListObjectsResponse::TPtr& ev) { + const auto& result = ev.Get()->Get()->Result; + LOG_D("HandleChangefeeds TEvExternalStorage::TEvListObjectResponse" + << ": self# " << SelfId() + << ", result# " << result); + + if (!CheckResult(result, "ListObjects")) { + return; + } + + const auto& objects = result.GetResult().GetContents(); + ChangefeedsKeys.clear(); + ChangefeedsKeys.reserve(objects.size()); + + for (const auto& obj : objects) { + const TFsPath& path = obj.GetKey(); + if (path.GetName() == "changefeed_description.pb") { + ChangefeedsKeys.push_back(path.Dirname()); + } + } + + if (!ChangefeedsKeys.empty()) { + auto& item = ImportInfo->Items.at(ItemIdx); + Resize(item.Changefeeds.MutableChangefeeds(), ChangefeedsKeys.size()); + + Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsKeys.size()); + HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed])); + } else { + Reply(); + } + + } + template bool CheckResult(const TResult& result, const TStringBuf marker) { if (result.IsSuccess()) { @@ -312,12 +476,20 @@ class TSchemeGetter: public TActorBootstrapped { TActor::PassAway(); } - void Download(const TString& key) { + void CreateClient() { if (Client) { Send(Client, new TEvents::TEvPoisonPill()); } Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())); + } + void ListChangefeeds() { + CreateClient(); + ListObjects(ImportInfo->Settings.items(ItemIdx).source_prefix()); + } + + void Download(const TString& key) { + CreateClient(); HeadObject(key); } @@ -337,6 +509,11 @@ class TSchemeGetter: public TActorBootstrapped { Download(ChecksumKey); } + void DownloadChangefeeds() { + Become(&TThis::StateDownloadChangefeeds); + ListChangefeeds(); + } + void ResetRetries() { Attempt = 0; } @@ -353,6 +530,11 @@ class TSchemeGetter: public TActorBootstrapped { Become(&TThis::StateDownloadPermissions); } + void StartDownloadingChangefeeds() { + ResetRetries(); + DownloadChangefeeds(); + } + void StartValidatingChecksum(const TString& key, const TString& object, std::function checksumValidatedCallback) { ChecksumKey = NBackup::ChecksumKey(key); Checksum = NBackup::ComputeChecksum(object); @@ -413,6 +595,27 @@ class TSchemeGetter: public TActorBootstrapped { } } + STATEFN(StateDownloadChangefeeds) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvListObjectsResponse, HandleChangefeeds); + hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChangefeed); + hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleChangefeed); + + sFunc(TEvents::TEvWakeup, DownloadChangefeeds); + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + + STATEFN(StateDownloadTopics) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleTopic); + hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleTopic); + + sFunc(TEvents::TEvWakeup, DownloadChangefeeds); + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + STATEFN(StateDownloadChecksum) { switch (ev->GetTypeRewrite()) { hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChecksum); @@ -432,6 +635,8 @@ class TSchemeGetter: public TActorBootstrapped { const TString MetadataKey; TString SchemeKey; const TString PermissionsKey; + TVector ChangefeedsKeys; + ui64 IndexDownloadedChangefeed = 0; const ui32 Retries; ui32 Attempt = 0; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index a364b30160f1..32f68f1f74a7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2828,6 +2828,7 @@ struct TImportInfo: public TSimpleRefCount { CreateSchemeObject, Transferring, BuildIndexes, + CreateChangefeed, Done = 240, Cancellation = 250, Cancelled = 251, @@ -2851,6 +2852,7 @@ struct TImportInfo: public TSimpleRefCount { TMaybe PreparedCreationQuery; TMaybeFail Permissions; NBackup::TMetadata Metadata; + NKikimrSchemeOp::TImportTableChangefeeds Changefeeds; EState State = EState::GetScheme; ESubState SubState = ESubState::AllocateTxId; @@ -2858,6 +2860,7 @@ struct TImportInfo: public TSimpleRefCount { TActorId SchemeGetter; TActorId SchemeQueryExecutor; int NextIndexIdx = 0; + int NextChangefeedIdx = 0; TString Issue; int ViewCreationRetries = 0; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index ce4c4e26cb33..86d035f21505 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1561,11 +1561,13 @@ struct Schema : NIceDb::Schema { // NKikimrSchemeOp::TModifyScheme serialized as string struct PreparedCreationQuery : Column<14, NScheme::NTypeIds::String> {}; struct Permissions : Column<11, NScheme::NTypeIds::String> {}; - struct Metadata : Column<12, NScheme::NTypeIds::String> {}; + struct Metadata : Column<12, NScheme::NTypeIds::String> {}; + struct Changefeeds : Column<15, NScheme::NTypeIds::String> { using Type = NKikimrSchemeOp::TImportTableChangefeeds; }; struct State : Column<7, NScheme::NTypeIds::Byte> {}; struct WaitTxId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TTxId; }; struct NextIndexIdx : Column<9, NScheme::NTypeIds::Uint32> {}; + struct NextChangefeedIdx : Column<16, NScheme::NTypeIds::Uint32> {}; struct Issue : Column<10, NScheme::NTypeIds::Utf8> {}; using TKey = TableKey; @@ -1580,9 +1582,11 @@ struct Schema : NIceDb::Schema { PreparedCreationQuery, Permissions, Metadata, + Changefeeds, State, WaitTxId, NextIndexIdx, + NextChangefeedIdx, Issue >; }; diff --git a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp index 899620a2d2e0..a918ed04d6f3 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp @@ -13,7 +13,7 @@ namespace NExportReboots { void CreateSchemeObjects(TTestWithReboots& t, TTestActorRuntime& runtime, const TVector& schemeObjects) { TSet toWait; - for (const auto& [type, scheme] : schemeObjects) { + for (const auto& [type, scheme, _] : schemeObjects) { switch (type) { case EPathTypeTable: TestCreateTable(runtime, ++t.TxId, "/MyRoot", scheme); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ut_backup_restore_common.h b/ydb/core/tx/schemeshard/ut_helpers/ut_backup_restore_common.h index eec5d324497c..07ba2c33aaf1 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ut_backup_restore_common.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ut_backup_restore_common.h @@ -1,6 +1,8 @@ #include #include +#include + using EDataFormat = NKikimr::NDataShard::NBackupRestoreTraits::EDataFormat; using ECompressionCodec = NKikimr::NDataShard::NBackupRestoreTraits::ECompressionCodec; @@ -16,9 +18,25 @@ using ECompressionCodec = NKikimr::NDataShard::NBackupRestoreTraits::ECompressio template \ void N(NUnitTest::TTestContext&) +namespace NAttr { + +enum class EKeys { + TOPIC_DESCRIPTION, +}; + +class TAttributes : public THashMap { +public: + const TString& GetTopicDescription() const { + return this->at(EKeys::TOPIC_DESCRIPTION); + } + +}; +} // NAttr + struct TTypedScheme { NKikimrSchemeOp::EPathType Type; TString Scheme; + NAttr::TAttributes Attributes; TTypedScheme(const char* scheme) : Type(NKikimrSchemeOp::EPathTypeTable) @@ -34,4 +52,10 @@ struct TTypedScheme { : Type(type) , Scheme(std::move(scheme)) {} + + TTypedScheme(NKikimrSchemeOp::EPathType type, TString scheme, NAttr::TAttributes&& attributes) + : Type(type) + , Scheme(std::move(scheme)) + , Attributes(std::move(attributes)) + {} }; diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index 4c701b30baab..ec6cb468cb45 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -139,12 +139,18 @@ namespace { } }; + struct TImportChangefeed { + TString Changefeed; + TString Topic; + }; + struct TTestDataWithScheme { TString Metadata; EPathType Type = EPathTypeTable; TString Scheme; TString CreationQuery; TString Permissions; + TImportChangefeed Changefeed; TVector Data; TTestDataWithScheme() = default; @@ -268,6 +274,9 @@ namespace { case EPathTypeView: result.CreationQuery = typedScheme.Scheme; break; + case EPathTypeCdcStream: + result.Changefeed = {typedScheme.Scheme, typedScheme.Attributes.GetTopicDescription()}; + break; default: UNIT_FAIL("cannot create sample test data for the scheme object type: " << typedScheme.Type); return {}; @@ -287,6 +296,10 @@ namespace { case EPathTypeView: result.emplace(prefix + "/create_view.sql", item.CreationQuery); break; + case EPathTypeCdcStream: + result.emplace(prefix + "/changefeed_description.pb", item.Changefeed.Changefeed); + result.emplace(prefix + "/topic_description.pb", item.Changefeed.Topic); + break; default: UNIT_FAIL("cannot determine key for the scheme object type: " << item.Type); return {}; @@ -301,6 +314,7 @@ namespace { if (item.Permissions) { result.emplace(prefix + "/permissions.pb", item.Permissions); } + for (ui32 i = 0; i < item.Data.size(); ++i) { const auto& data = item.Data.at(i); result.emplace(Sprintf("%s/data_%02d%s", prefix.data(), i, data.Ext().c_str()), data.Data); @@ -4810,6 +4824,213 @@ Y_UNIT_TEST_SUITE(TImportTests) { NLs::IsView }); } + + struct TGeneratedChangefeed { + std::pair Changefeed; + std::function Checker; + }; + + TGeneratedChangefeed GenChangefeed(ui64 num = 1) { + const TString changefeedName = TStringBuilder() << "updates_feed" << num; + const auto changefeedPath = TStringBuilder() << "/" << changefeedName; + + const auto changefeedDesc = Sprintf(R"( + name: "%s" + mode: MODE_UPDATES + format: FORMAT_JSON + state: STATE_ENABLED + )", changefeedName.c_str()); + + const auto topicDesc = R"( + partitioning_settings { + min_active_partitions: 1 + max_active_partitions: 1 + auto_partitioning_settings { + strategy: AUTO_PARTITIONING_STRATEGY_DISABLED + partition_write_speed { + stabilization_window { + seconds: 300 + } + up_utilization_percent: 80 + down_utilization_percent: 20 + } + } + } + partitions { + active: true + } + retention_period { + seconds: 86400 + } + partition_write_speed_bytes_per_second: 1048576 + partition_write_burst_bytes: 1048576 + attributes { + key: "__max_partition_message_groups_seqno_stored" + value: "6000000" + } + attributes { + key: "_allow_unauthenticated_read" + value: "true" + } + attributes { + key: "_allow_unauthenticated_write" + value: "true" + } + attributes { + key: "_message_group_seqno_retention_period_ms" + value: "1382400000" + } + consumers { + name: "my_consumer" + read_from { + } + attributes { + key: "_service_type" + value: "data-streams" + } + } + )"; + + NAttr::TAttributes attr; + attr.emplace(NAttr::EKeys::TOPIC_DESCRIPTION, topicDesc); + return { + {changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})}, + [changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) { + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), { + NLs::PathExist + }); + } + }; + } + + TVector> GenChangefeeds(THashMap& bucketContent, ui64 count = 1) { + TVector> checkers; + checkers.reserve(count); + for (ui64 i = 1; i <= count; ++i) { + auto genChangefeed = GenChangefeed(i); + bucketContent.emplace(genChangefeed.Changefeed); + checkers.push_back(genChangefeed.Checker); + } + return checkers; + } + + std::function AddedSchemeCommon(THashMap& bucketContent, const TString& permissions) { + const auto data = GenerateTestData(R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )", {{"a", 1}}, permissions); + + bucketContent.emplace("", data); + return [](TTestBasicRuntime& runtime) { + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), { + NLs::PathExist + }); + }; + } + + std::function AddedScheme(THashMap& bucketContent) { + return AddedSchemeCommon(bucketContent, ""); + } + + std::function AddedSchemeWithPermissions(THashMap& bucketContent) { + const auto permissions = R"( + actions { + change_owner: "eve" + } + actions { + grant { + subject: "alice" + permission_names: "ydb.generic.read" + } + } + actions { + grant { + subject: "alice" + permission_names: "ydb.generic.write" + } + } + actions { + grant { + subject: "bob" + permission_names: "ydb.generic.read" + } + } + )"; + return AddedSchemeCommon(bucketContent, permissions); + } + + using SchemeFunction = std::function(THashMap&)>; + + void TestImportChangefeeds(ui64 countChangefeed, SchemeFunction addedScheme) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + runtime.GetAppData().FeatureFlags.SetEnableChangefeedsImport(true); + runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); + + const auto data = GenerateTestData(R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )"); + + THashMap bucketContent(countChangefeed + 1); + + auto checkerTable = addedScheme(bucketContent); + auto checkersChangefeeds = GenChangefeeds(bucketContent, countChangefeed); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock(ConvertTestData(bucketContent), TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "" + destination_path: "/MyRoot/Table" + } + } + )", port)); + env.TestWaitNotification(runtime, txId); + + checkerTable(runtime); + for (const auto& checker : checkersChangefeeds) { + checker(runtime); + } + } + + Y_UNIT_TEST(Changefeed) { + TestImportChangefeeds(1, AddedScheme); + } + + Y_UNIT_TEST(Changefeeds) { + TestImportChangefeeds(3, AddedScheme); + } + + Y_UNIT_TEST(ChangefeedWithTablePermissions) { + TestImportChangefeeds(1, AddedSchemeWithPermissions); + } + + Y_UNIT_TEST(ChangefeedsWithTablePermissions) { + TestImportChangefeeds(3, AddedSchemeWithPermissions); + } } Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { @@ -4850,6 +5071,7 @@ Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); + runtime.GetAppData().FeatureFlags.SetEnableChangefeedsImport(true); if (createsViews) { runtime.GetAppData().FeatureFlags.SetEnableViews(true); } @@ -5152,6 +5374,101 @@ Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { ); } + THashMap GetSchemeWithChangefeed() { + THashMap schemes; + + const auto changefeedName = "update_changefeed"; + + schemes.emplace("", R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )"); + + const auto changefeedDesc = Sprintf(R"( + name: "%s" + mode: MODE_UPDATES + format: FORMAT_JSON + state: STATE_ENABLED + )", changefeedName); + + const auto topicDesc = R"( + partitioning_settings { + min_active_partitions: 1 + max_active_partitions: 1 + auto_partitioning_settings { + strategy: AUTO_PARTITIONING_STRATEGY_DISABLED + partition_write_speed { + stabilization_window { + seconds: 300 + } + up_utilization_percent: 80 + down_utilization_percent: 20 + } + } + } + partitions { + active: true + } + retention_period { + seconds: 86400 + } + partition_write_speed_bytes_per_second: 1048576 + partition_write_burst_bytes: 1048576 + attributes { + key: "__max_partition_message_groups_seqno_stored" + value: "6000000" + } + attributes { + key: "_allow_unauthenticated_read" + value: "true" + } + attributes { + key: "_allow_unauthenticated_write" + value: "true" + } + attributes { + key: "_message_group_seqno_retention_period_ms" + value: "1382400000" + } + consumers { + name: "my_consumer" + read_from { + } + attributes { + key: "_service_type" + value: "data-streams" + } + } + )"; + + NAttr::TAttributes attr; + attr.emplace(NAttr::EKeys::TOPIC_DESCRIPTION, topicDesc); + + schemes.emplace("/update_feed", + TTypedScheme { + EPathTypeCdcStream, + changefeedDesc, + std::move(attr) + } + ); + return schemes; + } + + Y_UNIT_TEST(ShouldSucceedOnSingleChangefeed) { + ShouldSucceed(GetSchemeWithChangefeed()); + } + + Y_UNIT_TEST(CancelShouldSucceedOnSingleChangefeed) { + CancelShouldSucceed(GetSchemeWithChangefeed()); + } + Y_UNIT_TEST(CancelShouldSucceedOnDependentView) { CancelShouldSucceed( { diff --git a/ydb/core/wrappers/ut_helpers/s3_mock.cpp b/ydb/core/wrappers/ut_helpers/s3_mock.cpp index f9e30df5e0ba..d131967e4ea1 100644 --- a/ydb/core/wrappers/ut_helpers/s3_mock.cpp +++ b/ydb/core/wrappers/ut_helpers/s3_mock.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -152,6 +153,59 @@ bool TS3Mock::TRequest::HttpServeRead(const TReplyParams& params, EMethod method return true; } +TString BuildContentXML(const TString& path) { + return Sprintf(R"( + + %s + + )", path.c_str()); +} + +TString BuildContentListXML(const TVector& paths) { + TString result; + for (const auto& path : paths) { + result += BuildContentXML(path); + } + return result; +} + +TString BuildListObjectsXML(const TVector& paths, const TStringBuf bucketName) { + return Sprintf(R"( + + + %s + false + %s + + )", bucketName.data(), BuildContentListXML(paths).c_str()); +} + +bool TS3Mock::TRequest::HttpServeList(const TReplyParams& params, TStringBuf bucketName, const TString& prefix) { + Cerr << "S3_MOCK::HttpServeList: " << prefix << Endl; + params.Output << "HTTP/1.1 200 Ok\r\n"; + THttpHeaders headers; + + TVector paths; + for (const auto& [key, value] : Parent->Data) { + TFsPath path = key; + if (path.IsSubpathOf(TStringBuilder() << bucketName << "/" << prefix)) { + paths.push_back(path); + } + } + + TString xml = BuildListObjectsXML(paths, bucketName); + + headers.AddHeader("Content-Type", "application/xml"); + headers.AddHeader("Content-Length", xml.length()); + headers.OutTo(¶ms.Output); + + params.Output << xml; + params.Output << "\r\n"; + params.Output.Flush(); + + return true; +} + bool TS3Mock::TRequest::HttpServeWrite(const TReplyParams& params, TStringBuf path, const TCgiParameters& queryParams) { TString content; ui64 length; @@ -366,7 +420,9 @@ bool TS3Mock::TRequest::DoReply(const TReplyParams& params) { case EMethod::Head: case EMethod::Get: - if (Parent->Data.contains(pathStr)) { + if (queryParams.Has("prefix")) { + return HttpServeList(params, pathStr, queryParams.Get("prefix")); + } else if (Parent->Data.contains(pathStr)) { return HttpServeRead(params, method, pathStr); } else { return HttpNotFound(params, "NoSuchKey"); diff --git a/ydb/core/wrappers/ut_helpers/s3_mock.h b/ydb/core/wrappers/ut_helpers/s3_mock.h index c537fd09ef7c..8f7fa0e7275c 100644 --- a/ydb/core/wrappers/ut_helpers/s3_mock.h +++ b/ydb/core/wrappers/ut_helpers/s3_mock.h @@ -47,6 +47,7 @@ class TS3Mock: public THttpServer::ICallBack { bool HttpNotImplemented(const TReplyParams& params); void MaybeContinue(const TReplyParams& params); bool HttpServeRead(const TReplyParams& params, EMethod method, const TStringBuf path); + bool HttpServeList(const TReplyParams& params, TStringBuf bucketName, const TString& prefix); bool HttpServeWrite(const TReplyParams& params, TStringBuf path, const TCgiParameters& queryParams); bool HttpServeAction(const TReplyParams& params, EMethod method, TStringBuf path, const TCgiParameters& queryParams); diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 1e22f596b0c1..afe68309c8a2 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -1204,8 +1204,9 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, } } -bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, - const Ydb::Table::Changefeed& in, Ydb::StatusIds::StatusCode& status, TString& error) { +template +bool FillChangefeedDescriptionCommon(NKikimrSchemeOp::TCdcStreamDescription& out, + const T& in, Ydb::StatusIds::StatusCode& status, TString& error) { out.SetName(in.name()); out.SetVirtualTimestamps(in.virtual_timestamps()); @@ -1245,6 +1246,17 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, return false; } + for (const auto& [key, value] : in.attributes()) { + auto& attr = *out.AddUserAttributes(); + attr.SetKey(key); + attr.SetValue(value); + } + + return true; +} + +bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, + const Ydb::Table::Changefeed& in, Ydb::StatusIds::StatusCode& status, TString& error) { if (in.initial_scan()) { if (!AppData()->FeatureFlags.GetEnableChangefeedInitialScan()) { status = Ydb::StatusIds::UNSUPPORTED; @@ -1253,14 +1265,12 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, } out.SetState(NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan); } + return FillChangefeedDescriptionCommon(out, in, status, error); +} - for (const auto& [key, value] : in.attributes()) { - auto& attr = *out.AddUserAttributes(); - attr.SetKey(key); - attr.SetValue(value); - } - - return true; +bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, + const Ydb::Table::ChangefeedDescription& in, Ydb::StatusIds::StatusCode& status, TString& error) { + return FillChangefeedDescriptionCommon(out, in, status, error); } void FillTableStats(Ydb::Table::DescribeTableResult& out, diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 0e897f758fda..2b73db84abcc 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -90,6 +90,8 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, // in bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, const Ydb::Table::Changefeed& in, Ydb::StatusIds::StatusCode& status, TString& error); +bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, + const Ydb::Table::ChangefeedDescription& in, Ydb::StatusIds::StatusCode& status, TString& error); // out void FillTableStats(Ydb::Table::DescribeTableResult& out, diff --git a/ydb/public/api/protos/ydb_import.proto b/ydb/public/api/protos/ydb_import.proto index 2a575a1f438a..7c9ec1cfbc7d 100644 --- a/ydb/public/api/protos/ydb_import.proto +++ b/ydb/public/api/protos/ydb_import.proto @@ -19,6 +19,7 @@ message ImportProgress { PROGRESS_DONE = 4; PROGRESS_CANCELLATION = 5; PROGRESS_CANCELLED = 6; + PROGRESS_CREATE_CHANGEFEEDS = 7; } } diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/import/import.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/import/import.h index 3e6b191cbf2c..d4975e75620e 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/import/import.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/import/import.h @@ -17,6 +17,7 @@ enum class EImportProgress { Done = 4, Cancellation = 5, Cancelled = 6, + CreateChangefeeds = 7, Unknown = std::numeric_limits::max(), }; diff --git a/ydb/public/sdk/cpp/src/client/proto/accessor.cpp b/ydb/public/sdk/cpp/src/client/proto/accessor.cpp index 9adbba5c0e6d..fa85db31ae1f 100644 --- a/ydb/public/sdk/cpp/src/client/proto/accessor.cpp +++ b/ydb/public/sdk/cpp/src/client/proto/accessor.cpp @@ -124,6 +124,8 @@ NImport::EImportProgress TProtoAccessor::FromProto(Ydb::Import::ImportProgress:: return NImport::EImportProgress::TransferData; case Ydb::Import::ImportProgress::PROGRESS_BUILD_INDEXES: return NImport::EImportProgress::BuildIndexes; + case Ydb::Import::ImportProgress::PROGRESS_CREATE_CHANGEFEEDS: + return NImport::EImportProgress::CreateChangefeeds; case Ydb::Import::ImportProgress::PROGRESS_DONE: return NImport::EImportProgress::Done; case Ydb::Import::ImportProgress::PROGRESS_CANCELLATION: diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 8759c64389d1..8cdfa535f662 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6201,6 +6201,16 @@ "ColumnId": 14, "ColumnName": "PreparedCreationQuery", "ColumnType": "String" + }, + { + "ColumnId": 15, + "ColumnName": "Changefeeds", + "ColumnType": "String" + }, + { + "ColumnId": 16, + "ColumnName": "NextChangefeedIdx", + "ColumnType": "Uint32" } ], "ColumnsDropped": [], @@ -6220,7 +6230,9 @@ 11, 12, 13, - 14 + 14, + 15, + 16 ], "RoomID": 0, "Codec": 0,