diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index e21149edb9f8..aae0c56a0853 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -541,6 +541,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item)); } + void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + auto& item = importInfo->Items.at(itemIdx); + item.SubState = ESubState::Proposed; + + LOG_I("TImport::TTxProgress: CreateConsumers propose" + << ": info# " << importInfo->ToString() + << ", item# " << item.ToString(itemIdx) + << ", txId# " << txId); + + Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); + + Send(Self->SelfId(), CreateConsumersPropose(Self, txId, item)); + } + void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); auto& item = importInfo->Items.at(itemIdx); @@ -622,6 +637,26 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase return path->LastTxId; } + TTxId GetActiveCreateConsumerTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + const auto& item = importInfo->Items.at(itemIdx); + + Y_ABORT_UNLESS(item.State == EState::CreateChangefeed); + Y_ABORT_UNLESS(item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateConsumers); + Y_ABORT_UNLESS(item.StreamImplPathId); + + if (!Self->PathsById.contains(item.StreamImplPathId)) { + return InvalidTxId; + } + + auto path = Self->PathsById.at(item.StreamImplPathId); + if (path->PathState != NKikimrSchemeOp::EPathStateAlter) { + return InvalidTxId; + } + + return path->LastTxId; + } + static TString MakeIndexBuildUid(TImportInfo::TPtr importInfo, ui32 itemIdx) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); const auto& item = importInfo->Items.at(itemIdx); @@ -816,10 +851,6 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase TTxId txId = InvalidTxId; switch (item.State) { - case EState::CreateChangefeed: - txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx); - break; - case EState::Transferring: if (!CancelTransferring(importInfo, itemIdx)) { txId = GetActiveRestoreTxId(importInfo, itemIdx); @@ -1045,7 +1076,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase break; case EState::CreateChangefeed: - CreateChangefeed(importInfo, i, txId); + if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { + CreateChangefeed(importInfo, i, txId); + } else { + CreateConsumers(importInfo, i, txId); + } itemIdx = i; break; @@ -1109,11 +1144,30 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } else if (item.State == EState::Transferring) { txId = GetActiveRestoreTxId(importInfo, itemIdx); } else if (item.State == EState::CreateChangefeed) { - txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx); + if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { + txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx); + } else { + txId = GetActiveCreateConsumerTxId(importInfo, itemIdx); + } + } } if (txId == InvalidTxId) { + + if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) { + if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { + item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers; + AllocateTxId(importInfo, itemIdx); + } else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) { + item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed; + AllocateTxId(importInfo, itemIdx); + } else { + item.State = EState::Done; + } + return; + } + return CancelAndPersist(db, importInfo, itemIdx, record.GetReason(), "unhappy propose"); } @@ -1290,7 +1344,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase break; case EState::CreateChangefeed: - if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) { + if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { + item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers; + AllocateTxId(importInfo, itemIdx); + } else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) { + item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed; AllocateTxId(importInfo, itemIdx); } else { item.State = EState::Done; diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp index f8cbdc55361a..ca3fbab64d71 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp @@ -291,5 +291,58 @@ THolder CreateChangefeedPropose( return propose; } +THolder CreateConsumersPropose( + TSchemeShard* ss, + TTxId txId, + TImportInfo::TItem& item +) { + Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()); + + const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx]; + const auto& topic = importChangefeedTopic.GetTopic(); + + auto propose = MakeHolder(ui64(txId), ss->TabletID()); + auto& record = propose->Record; + auto& modifyScheme = *record.AddTransaction(); + modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup); + auto& pqGroup = *modifyScheme.MutableAlterPersQueueGroup(); + + const TPath dstPath = TPath::Init(item.DstPathId, ss); + const TString changefeedPath = dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name(); + modifyScheme.SetWorkingDir(changefeedPath); + modifyScheme.SetInternal(true); + + pqGroup.SetName("streamImpl"); + + NKikimrSchemeOp::TDescribeOptions opts; + opts.SetReturnPartitioningInfo(false); + opts.SetReturnPartitionConfig(true); + opts.SetReturnBoundaries(true); + opts.SetReturnIndexTableBoundaries(true); + opts.SetShowPrivateTable(true); + auto describeSchemeResult = DescribePath(ss, TlsActivationContext->AsActorContext(),changefeedPath + "/streamImpl", opts); + + const auto& response = describeSchemeResult->GetRecord().GetPathDescription(); + item.StreamImplPathId = {response.GetSelf().GetSchemeshardId(), response.GetSelf().GetPathId()}; + pqGroup.CopyFrom(response.GetPersQueueGroup()); + + pqGroup.ClearTotalGroupCount(); + pqGroup.MutablePQTabletConfig()->ClearPartitionKeySchema(); + + auto* tabletConfig = pqGroup.MutablePQTabletConfig(); + const auto& pqConfig = AppData()->PQConfig; + + for (const auto& consumer : topic.consumers()) { + auto& addedConsumer = *tabletConfig->AddConsumers(); + auto consumerName = NPersQueue::ConvertNewConsumerName(consumer.name(), pqConfig); + addedConsumer.SetName(consumerName); + if (consumer.important()) { + addedConsumer.SetImportant(true); + } + } + + return propose; +} + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h index 26ac6e2825cd..107db1b34c47 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h @@ -52,5 +52,11 @@ THolder CreateChangefeedPropose( const TImportInfo::TItem& item ); +THolder CreateConsumersPropose( + TSchemeShard* ss, + TTxId txId, + TImportInfo::TItem& item +); + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 84fd17c77185..9f4a8e34abf6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2846,6 +2846,11 @@ struct TImportInfo: public TSimpleRefCount { Subscribed, }; + enum class EChangefeedState: ui8 { + CreateChangefeed = 0, + CreateConsumers, + }; + TString DstPathName; TPathId DstPathId; Ydb::Table::CreateTableRequest Scheme; @@ -2857,6 +2862,7 @@ struct TImportInfo: public TSimpleRefCount { EState State = EState::GetScheme; ESubState SubState = ESubState::AllocateTxId; + EChangefeedState ChangefeedState = EChangefeedState::CreateChangefeed; TTxId WaitTxId = InvalidTxId; TActorId SchemeGetter; TActorId SchemeQueryExecutor; @@ -2864,6 +2870,7 @@ struct TImportInfo: public TSimpleRefCount { int NextChangefeedIdx = 0; TString Issue; int ViewCreationRetries = 0; + TPathId StreamImplPathId; TItem() = default; diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 6d5516ff4e2f..9a266bd00694 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1308,6 +1308,29 @@ THolder DescribePath( return DescribePath(self, ctx, pathId, options); } +THolder DescribePath( + TSchemeShard* self, + const TActorContext& ctx, + const TString& path, + const NKikimrSchemeOp::TDescribeOptions& opts +) { + NKikimrSchemeOp::TDescribePath params; + params.SetPath(path); + params.MutableOptions()->CopyFrom(opts); + + return TPathDescriber(self, std::move(params)).Describe(ctx); +} + +THolder DescribePath( + TSchemeShard* self, + const TActorContext& ctx, + const TString& path +) { + NKikimrSchemeOp::TDescribeOptions options; + options.SetShowPrivateTable(true); + return DescribePath(self, ctx, path, options); +} + void TSchemeShard::DescribeTable( const TTableInfo& tableInfo, const NScheme::TTypeRegistry* typeRegistry, diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.h b/ydb/core/tx/schemeshard/schemeshard_path_describer.h index 317fbef59150..e4770ff53edb 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.h +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.h @@ -83,5 +83,18 @@ THolder DescribePath( TPathId pathId ); +THolder DescribePath( + TSchemeShard* self, + const TActorContext& ctx, + const TString& path, + const NKikimrSchemeOp::TDescribeOptions& opts +); + +THolder DescribePath( + TSchemeShard* self, + const TActorContext& ctx, + const TString& path +); + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 42cfda9f4f48..79e7a1df3024 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -968,6 +968,19 @@ TCheckFunc RetentionPeriod(const TDuration& value) { }; } +TCheckFunc ConsumerExist(const TString& name) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + bool isExist = false; + for (const auto& consumer : record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetConsumers()) { + if (consumer.GetName() == name) { + isExist = true; + break; + } + } + UNIT_ASSERT(isExist); + }; +} + void NoChildren(const NKikimrScheme::TEvDescribeSchemeResult& record) { ChildrenCount(0)(record); } diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index 1813edca62b5..eb42a9ef8f60 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -169,6 +169,7 @@ namespace NLs { TCheckFunc StreamAwsRegion(const TString& value); TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed); TCheckFunc RetentionPeriod(const TDuration& value); + TCheckFunc ConsumerExist(const TString& name); TCheckFunc HasBackupInFly(ui64 txId); void NoBackupInFly(const NKikimrScheme::TEvDescribeSchemeResult& record); diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index 1035a65e7893..9814d8992d13 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -5067,7 +5067,10 @@ Y_UNIT_TEST_SUITE(TImportTests) { {changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})}, [changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) { TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), { - NLs::PathExist + NLs::PathExist, + }); + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath + "/streamImpl", false, false, true), { + NLs::ConsumerExist("my_consumer") }); } };