diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index f183d5794972..76f38b568f64 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -145,6 +145,7 @@ #include #include #include +#include #include #include #include @@ -1114,6 +1115,19 @@ void TSharedCacheInitializer::InitializeServices( TActorSetupCmd(actor, TMailboxType::ReadAsFilled, appData->UserPoolId)); } +// TSharedMetadaCacheInitializer +TSharedMetadaCacheInitializer::TSharedMetadaCacheInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) +{} + +void TSharedMetadaCacheInitializer::InitializeServices( NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) { + if (appData->FeatureFlags.GetEnableSharedMetadataCache()) { + auto* actor = NKikimr::NOlap::NDataAccessorControl::TNodeActor::CreateActor(); + setup->LocalServices.emplace_back(NKikimr::NOlap::NDataAccessorControl::TNodeActor::MakeActorId(NodeId), + TActorSetupCmd(actor, TMailboxType::HTSwap, appData->UserPoolId)); + } +} + // TBlobCacheInitializer TBlobCacheInitializer::TBlobCacheInitializer(const TKikimrRunConfig& runConfig) diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index cadb077a875e..9eb12c177899 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -95,6 +95,13 @@ class TSharedCacheInitializer : public IKikimrServicesInitializer { void InitializeServices(NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) override; }; +class TSharedMetadaCacheInitializer : public IKikimrServicesInitializer { +public: +TSharedMetadaCacheInitializer(const TKikimrRunConfig& runConfig); + + void InitializeServices(NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) override; +}; + class TBlobCacheInitializer : public IKikimrServicesInitializer { public: TBlobCacheInitializer(const TKikimrRunConfig& runConfig); diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 77090167a6cb..c80b01f2ab7d 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1663,6 +1663,10 @@ TIntrusivePtr TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TMemProfMonitorInitializer(runConfig, ProcessMemoryInfoProvider)); + if (serviceMask.EnableSharedMetadaCache) { + sil->AddServiceInitializer(new TSharedMetadaCacheInitializer(runConfig)); + } + #if defined(ENABLE_MEMORY_TRACKING) if (serviceMask.EnableMemoryTracker) { sil->AddServiceInitializer(new TMemoryTrackerInitializer(runConfig)); diff --git a/ydb/core/driver_lib/run/service_mask.h b/ydb/core/driver_lib/run/service_mask.h index 9bb31d2df8b8..175dfc0b8c34 100644 --- a/ydb/core/driver_lib/run/service_mask.h +++ b/ydb/core/driver_lib/run/service_mask.h @@ -81,6 +81,7 @@ union TBasicKikimrServicesMask { bool EnableGroupedMemoryLimiter:1; bool EnableAwsService:1; bool EnableCompPriorities : 1; + bool EnableSharedMetadaCache : 1; }; struct { diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index e07868b4638c..55666ce0a053 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -206,4 +206,5 @@ message TFeatureFlags { optional bool SwitchToConfigV1 = 180 [default = false]; optional bool EnableEncryptedExport = 181 [default = false]; optional bool EnableAlterDatabase = 182 [default = false]; + optional bool EnableSharedMetadataCache = 183 [default = true]; } diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 9992dde56355..22f1f11b4f24 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -77,6 +77,7 @@ class TTestFeatureFlagsHolder { FEATURE_FLAG_SETTER(EnableDatabaseAdmin) FEATURE_FLAG_SETTER(EnablePermissionsExport) FEATURE_FLAG_SETTER(EnableShowCreate) + FEATURE_FLAG_SETTER(EnableSharedMetadataCache) #undef FEATURE_FLAG_SETTER }; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index bcdda46fa9c1..b72ceeead0b7 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -78,6 +78,7 @@ #include #include #include +#include #include #include #include @@ -1180,6 +1181,16 @@ namespace Tests { const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NConveyor::TInsertServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } + { + if (Settings->FeatureFlags.GetEnableSharedMetadataCache()) { + auto* actor = NKikimr::NOlap::NDataAccessorControl::TNodeActor::CreateActor(); + + const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::HTSwap, 0); + const auto serviceId = NKikimr::NOlap::NDataAccessorControl::TNodeActor::MakeActorId(Runtime->GetNodeId(nodeIdx)); + Runtime->RegisterService(serviceId, aid, nodeIdx); + } + } + Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); auto sysViewService = NSysView::CreateSysViewServiceForTests(); diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index f3b3a37908a4..92308e4cfb74 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -4,6 +4,7 @@ #include "blobs_reader/actor.h" #include "counters/aggregation/table_stats.h" #include "data_accessor/actor.h" +#include "data_accessor/node_actor.h" #include "data_accessor/manager.h" #include "engines/column_engine_logs.h" #include "engines/writer/buffer/actor.h" @@ -124,7 +125,13 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) { ResourceSubscribeActor = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletID(), SelfId())); BufferizationInsertionWriteActorId = ctx.Register(new NColumnShard::NWriting::TActor(TabletID(), SelfId())); BufferizationPortionsWriteActorId = ctx.Register(new NOlap::NWritingPortions::TActor(TabletID(), SelfId())); - DataAccessorsControlActorId = ctx.Register(new NOlap::NDataAccessorControl::TActor(TabletID(), SelfId())); + // Change actor here + if (AppData(ctx)->FeatureFlags.GetEnableSharedMetadataCache()){ + DataAccessorsControlActorId = NOlap::NDataAccessorControl::TNodeActor::MakeActorId(ctx.SelfID.NodeId()); + } else { + DataAccessorsControlActorId = ctx.Register(new NOlap::NDataAccessorControl::TActor(TabletID(), SelfId())); + } + DataAccessorsManager = std::make_shared(DataAccessorsControlActorId, SelfId()), PrioritizationClientId = NPrioritiesQueue::TCompServiceOperator::RegisterClient(); diff --git a/ydb/core/tx/columnshard/columnshard__statistics.cpp b/ydb/core/tx/columnshard/columnshard__statistics.cpp index 61111ee13abf..c7ef61bfcf72 100644 --- a/ydb/core/tx/columnshard/columnshard__statistics.cpp +++ b/ydb/core/tx/columnshard/columnshard__statistics.cpp @@ -113,18 +113,22 @@ class TColumnPortionsAccumulator { std::shared_ptr DataAccessors; std::shared_ptr Result; const std::shared_ptr VersionedIndex; + const NOlap::TTabletId TabletId; public: TColumnPortionsAccumulator(const std::shared_ptr& storagesManager, const std::shared_ptr& result, const ui32 portionsCountLimit, const std::set& originalColumnTags, const std::shared_ptr& vIndex, - const std::shared_ptr& dataAccessorsManager) + const std::shared_ptr& dataAccessorsManager, + const NOlap::TTabletId tabletId) : StoragesManager(storagesManager) , ColumnTagsRequested(originalColumnTags) , PortionsCountLimit(portionsCountLimit) , DataAccessors(dataAccessorsManager) , Result(result) - , VersionedIndex(vIndex) { + , VersionedIndex(vIndex) + , TabletId(tabletId) + { } class TIndexReadTask: public NOlap::NBlobOperations::NRead::ITask { @@ -259,7 +263,7 @@ class TColumnPortionsAccumulator { } request->RegisterSubscriber(std::make_shared(StoragesManager, Result, VersionedIndex, ColumnTagsRequested)); Portions.clear(); - DataAccessors->AskData(request); + DataAccessors->AskData(TabletId, request); } void AddTask(const NOlap::TPortionInfo::TConstPtr& portion) { @@ -306,7 +310,7 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, std::make_shared(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response)); auto versionedIndex = std::make_shared(index.GetVersionedIndex()); TColumnPortionsAccumulator portionsPack( - StoragesManager, resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified()); + StoragesManager, resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID()); for (const auto& [_, portionInfo] : spg->GetPortions()) { if (!portionInfo->IsVisible(GetMaxReadVersion())) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index c53c4eb3085a..666c5668b22f 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -852,21 +852,24 @@ class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITa std::shared_ptr Request; std::shared_ptr Subscriber; std::shared_ptr DataAccessorsManager; + const NOlap::TTabletId TabletId; virtual void DoOnAllocationSuccess(const std::shared_ptr& guard) override { Subscriber->SetResourcesGuard(guard); Request->RegisterSubscriber(Subscriber); - DataAccessorsManager->AskData(Request); + DataAccessorsManager->AskData(TabletId, Request); } public: TAccessorsMemorySubscriber(const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context, std::shared_ptr&& request, const std::shared_ptr& subscriber, - const std::shared_ptr& dataAccessorsManager) + const std::shared_ptr& dataAccessorsManager, + NOlap::TTabletId tabletId) : TBase(0, memory, externalTaskId, context) , Request(std::move(request)) , Subscriber(subscriber) - , DataAccessorsManager(dataAccessorsManager) { + , DataAccessorsManager(dataAccessorsManager) + , TabletId(tabletId){ } }; @@ -912,7 +915,7 @@ void TColumnShard::StartCompaction(const std::shared_ptr(accessorsMemory, indexChanges->GetTaskIdentifier(), - CompactTaskSubscription, std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified())); + CompactTaskSubscription, std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID())); } class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRead { @@ -981,7 +984,7 @@ void TColumnShard::SetupMetadata() { NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor, std::make_shared(accessorsMemory, i.GetRequest()->GetTaskId(), TTLTaskSubscription, std::shared_ptr(i.GetRequest()), - std::make_shared(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified())); + std::make_shared(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID())); } } @@ -1020,7 +1023,7 @@ bool TColumnShard::SetupTtl() { request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) + memoryUsage; NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription( ResourceSubscribeActor, std::make_shared(accessorsMemory, i->GetTaskIdentifier(), TTLTaskSubscription, - std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified())); + std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID())); } return true; } @@ -1069,7 +1072,7 @@ void TColumnShard::SetupCleanupPortions() { NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription( ResourceSubscribeActor, std::make_shared(accessorsMemory, changes->GetTaskIdentifier(), TTLTaskSubscription, - std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified())); + std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID())); } void TColumnShard::SetupCleanupTables() { @@ -1392,6 +1395,7 @@ class TAccessorsParsingTask: public NConveyor::ITask { private: std::shared_ptr FetchCallback; std::vector Portions; + const NOlap::TTabletId TabletId; virtual void DoExecute(const std::shared_ptr& /*taskPtr*/) override { std::vector accessors; @@ -1399,7 +1403,7 @@ class TAccessorsParsingTask: public NConveyor::ITask { for (auto&& i : Portions) { accessors.emplace_back(i.BuildAccessor()); } - FetchCallback->OnAccessorsFetched(std::move(accessors)); + FetchCallback->OnAccessorsFetched(TabletId, std::move(accessors)); } virtual void DoOnCannotExecute(const TString& reason) override { AFL_VERIFY(false)("cannot parse metadata", reason); @@ -1411,9 +1415,10 @@ class TAccessorsParsingTask: public NConveyor::ITask { } TAccessorsParsingTask( - const std::shared_ptr& callback, std::vector&& portions) + const std::shared_ptr& callback, std::vector&& portions, const NOlap::TTabletId tabletId) : FetchCallback(callback) , Portions(std::move(portions)) + , TabletId(tabletId) { } @@ -1499,7 +1504,7 @@ class TTxAskPortionChunks: public TTransactionBase { } AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished"); - NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared(FetchCallback, std::move(FetchedAccessors))); + NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared(FetchCallback, std::move(FetchedAccessors), (NOlap::TTabletId)txc.Tablet)); return true; } void Complete(const TActorContext& /*ctx*/) override { diff --git a/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp b/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp index 18b138b607d7..6257aeac3d7e 100644 --- a/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp @@ -16,8 +16,8 @@ TDataCategorized IGranuleDataAccessor::AnalyzeData( return DoAnalyzeData(portions, consumer); } -void TActorAccessorsCallback::OnAccessorsFetched(std::vector&& accessors) { - NActors::TActivationContext::Send(ActorId, std::make_unique(std::move(accessors))); +void TActorAccessorsCallback::OnAccessorsFetched(TTabletId tabletId, std::vector&& accessors) { + NActors::TActivationContext::Send(ActorId, std::make_unique(tabletId, std::move(accessors))); } } // namespace NKikimr::NOlap::NDataAccessorControl diff --git a/ydb/core/tx/columnshard/data_accessor/abstract/collector.h b/ydb/core/tx/columnshard/data_accessor/abstract/collector.h index 68cb91ef680b..86e2f721197b 100644 --- a/ydb/core/tx/columnshard/data_accessor/abstract/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/abstract/collector.h @@ -5,7 +5,7 @@ namespace NKikimr::NOlap::NDataAccessorControl { class IAccessorCallback { public: - virtual void OnAccessorsFetched(std::vector&& accessors) = 0; + virtual void OnAccessorsFetched(TTabletId TabletId, std::vector&& accessors) = 0; virtual ~IAccessorCallback() = default; }; @@ -14,7 +14,7 @@ class TActorAccessorsCallback: public IAccessorCallback { const NActors::TActorId ActorId; public: - virtual void OnAccessorsFetched(std::vector&& accessors) override; + virtual void OnAccessorsFetched(TTabletId tabletId, std::vector&& accessors) override; TActorAccessorsCallback(const NActors::TActorId& actorId) : ActorId(actorId) { } @@ -37,6 +37,7 @@ class TDataCategorized { class IGranuleDataAccessor { private: const TInternalPathId PathId; + const TTabletId TabletId; virtual void DoAskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) = 0; @@ -49,9 +50,13 @@ class IGranuleDataAccessor { TInternalPathId GetPathId() const { return PathId; } + TTabletId GetTabletId() const { + return TabletId; + } - IGranuleDataAccessor(const TInternalPathId pathId) - : PathId(pathId) { + IGranuleDataAccessor(const TTabletId tabletId, const TInternalPathId pathId) + : PathId(pathId) + , TabletId(tabletId) { } void AskData( diff --git a/ydb/core/tx/columnshard/data_accessor/abstract/manager.h b/ydb/core/tx/columnshard/data_accessor/abstract/manager.h index ec1516c1cdbf..e6cf3d9147a7 100644 --- a/ydb/core/tx/columnshard/data_accessor/abstract/manager.h +++ b/ydb/core/tx/columnshard/data_accessor/abstract/manager.h @@ -12,7 +12,7 @@ class TGranuleMeta; namespace NKikimr::NOlap::NDataAccessorControl { class IMetadataMemoryManager { private: - virtual std::unique_ptr DoBuildCollector(const TInternalPathId pathId) = 0; + virtual std::unique_ptr DoBuildCollector(const TTabletId tabletId, const TInternalPathId pathId) = 0; virtual std::shared_ptr DoBuildLoader( const TVersionedIndex& versionedIndex, TGranuleMeta* granule, const std::shared_ptr& dsGroupSelector) = 0; @@ -22,8 +22,8 @@ class IMetadataMemoryManager { return false; } - std::unique_ptr BuildCollector(const TInternalPathId pathId) { - return DoBuildCollector(pathId); + std::unique_ptr BuildCollector(const TTabletId tabletId, const TInternalPathId pathId) { + return DoBuildCollector(tabletId, pathId); } std::shared_ptr BuildLoader( diff --git a/ydb/core/tx/columnshard/data_accessor/actor.cpp b/ydb/core/tx/columnshard/data_accessor/actor.cpp index 65680779d18e..8bb5663b3ba5 100644 --- a/ydb/core/tx/columnshard/data_accessor/actor.cpp +++ b/ydb/core/tx/columnshard/data_accessor/actor.cpp @@ -3,7 +3,7 @@ namespace NKikimr::NOlap::NDataAccessorControl { void TActor::Handle(TEvAskServiceDataAccessors::TPtr& ev) { - Manager->AskData(ev->Get()->GetRequest()); + Manager->AskData(ev->Get()->GetTabletId(), ev->Get()->GetRequest()); } void TActor::Bootstrap() { diff --git a/ydb/core/tx/columnshard/data_accessor/actor.h b/ydb/core/tx/columnshard/data_accessor/actor.h index e21b7af85205..0bc90c0d8390 100644 --- a/ydb/core/tx/columnshard/data_accessor/actor.h +++ b/ydb/core/tx/columnshard/data_accessor/actor.h @@ -22,18 +22,18 @@ class TActor: public TActorBootstrapped { } void Handle(TEvRegisterController::TPtr& ev) { - Manager->RegisterController(ev->Get()->ExtractController(), ev->Get()->IsUpdate()); + Manager->RegisterController(ev->Get()->ExtractController(), ev->Get()->GetTabletId(), ev->Get()->IsUpdate()); } void Handle(TEvUnregisterController::TPtr& ev) { - Manager->UnregisterController(ev->Get()->GetPathId()); + Manager->UnregisterController(ev->Get()->GetTabletId(), ev->Get()->GetPathId()); } void Handle(TEvAddPortion::TPtr& ev) { for (auto&& a : ev->Get()->ExtractAccessors()) { - Manager->AddPortion(std::move(a)); + Manager->AddPortion(ev->Get()->GetTabletId(), std::move(a)); } } void Handle(TEvRemovePortion::TPtr& ev) { - Manager->RemovePortion(ev->Get()->GetPortion()); + Manager->RemovePortion(ev->Get()->GetTabletId(), ev->Get()->GetPortion()); } void Handle(TEvAskServiceDataAccessors::TPtr& ev); diff --git a/ydb/core/tx/columnshard/data_accessor/events.h b/ydb/core/tx/columnshard/data_accessor/events.h index b9ffe399c789..bfc30f2a4a73 100644 --- a/ydb/core/tx/columnshard/data_accessor/events.h +++ b/ydb/core/tx/columnshard/data_accessor/events.h @@ -19,17 +19,20 @@ namespace NKikimr::NOlap::NDataAccessorControl { class TEvAddPortion: public NActors::TEventLocal { private: std::vector Accessors; + YDB_READONLY_DEF(TTabletId, TabletId); public: std::vector ExtractAccessors() { return std::move(Accessors); } - explicit TEvAddPortion(const TPortionDataAccessor& accessor) { + TEvAddPortion(const TTabletId tabletId, const TPortionDataAccessor& accessor) + : TabletId(tabletId) { Accessors.emplace_back(accessor); } - explicit TEvAddPortion(const std::vector& accessors) { + TEvAddPortion(const TTabletId tabletId, const std::vector& accessors) + : TabletId(tabletId) { Accessors = accessors; } }; @@ -37,10 +40,12 @@ class TEvAddPortion: public NActors::TEventLocal { private: YDB_READONLY_DEF(TPortionInfo::TConstPtr, Portion); + YDB_READONLY_DEF(TTabletId, TabletId); public: - explicit TEvRemovePortion(const TPortionInfo::TConstPtr& portion) - : Portion(portion) { + TEvRemovePortion(const TTabletId tabletId, const TPortionInfo::TConstPtr& portion) + : Portion(portion) + , TabletId(tabletId) { } }; @@ -48,6 +53,7 @@ class TEvRegisterController: public NActors::TEventLocal Controller; bool IsUpdateFlag = false; + TTabletId TabletId; public: bool IsUpdate() const { @@ -58,9 +64,12 @@ class TEvRegisterController: public NActors::TEventLocal&& accessor, const bool isUpdate) + TTabletId GetTabletId() const { return TabletId;} + + TEvRegisterController(std::unique_ptr&& accessor, const TTabletId tabletId, const bool isUpdate) : Controller(std::move(accessor)) , IsUpdateFlag(isUpdate) + , TabletId(tabletId) { } }; @@ -69,10 +78,12 @@ class TEvUnregisterController : public NActors::TEventLocal { private: YDB_READONLY_DEF(TInternalPathId, PathId); + YDB_READONLY_DEF(TTabletId, TabletId); public: - explicit TEvUnregisterController(const TInternalPathId pathId) - : PathId(pathId) { + TEvUnregisterController(const TTabletId tabletId, const TInternalPathId pathId) + : PathId(pathId) + , TabletId(tabletId){ } }; @@ -81,13 +92,15 @@ class TEvAskTabletDataAccessors: public NActors::TEventLocal, Portions); YDB_READONLY_DEF(std::shared_ptr, Callback); YDB_READONLY_DEF(TString, Consumer); + YDB_READONLY_DEF(TTabletId, TabletId); public: explicit TEvAskTabletDataAccessors(const std::vector& portions, - const std::shared_ptr& callback, const TString& consumer) + const std::shared_ptr& callback, const TString& consumer, const TTabletId tabletId) : Portions(portions) , Callback(callback) - , Consumer(consumer) { + , Consumer(consumer) + , TabletId(tabletId) { } }; @@ -95,10 +108,12 @@ class TEvAskServiceDataAccessors : public NActors::TEventLocal { private: YDB_READONLY_DEF(std::shared_ptr, Request); + YDB_READONLY_DEF(TTabletId, TabletId); public: - explicit TEvAskServiceDataAccessors(const std::shared_ptr& request) - : Request(request) { + explicit TEvAskServiceDataAccessors(const TTabletId tabletId, const std::shared_ptr& request) + : Request(request) + , TabletId(tabletId) { } }; diff --git a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h index 407d4af2a95f..1c0d1d403ec4 100644 --- a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h @@ -14,8 +14,8 @@ class TCollector: public IGranuleDataAccessor { const std::vector& remove) override; public: - TCollector(const TInternalPathId pathId) - : TBase(pathId) { + TCollector(const TTabletId tabletId, const TInternalPathId pathId) + : TBase(tabletId, pathId) { } }; diff --git a/ydb/core/tx/columnshard/data_accessor/in_mem/manager.cpp b/ydb/core/tx/columnshard/data_accessor/in_mem/manager.cpp index c136734d172b..083be019d8f6 100644 --- a/ydb/core/tx/columnshard/data_accessor/in_mem/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/in_mem/manager.cpp @@ -19,8 +19,8 @@ std::shared_ptr TManager::DoBuildLoader( return result; } -std::unique_ptr TManager::DoBuildCollector(const TInternalPathId pathId) { - return std::make_unique(pathId); +std::unique_ptr TManager::DoBuildCollector(const TTabletId tabletId, const TInternalPathId pathId) { + return std::make_unique(tabletId, pathId); } } // namespace NKikimr::NOlap::NDataAccessorControl::NInMem diff --git a/ydb/core/tx/columnshard/data_accessor/in_mem/manager.h b/ydb/core/tx/columnshard/data_accessor/in_mem/manager.h index 442c90f056a4..879d6310c63c 100644 --- a/ydb/core/tx/columnshard/data_accessor/in_mem/manager.h +++ b/ydb/core/tx/columnshard/data_accessor/in_mem/manager.h @@ -6,7 +6,7 @@ namespace NKikimr::NOlap::NDataAccessorControl::NInMem { class TManager: public IMetadataMemoryManager { private: - virtual std::unique_ptr DoBuildCollector(const TInternalPathId pathId) override; + virtual std::unique_ptr DoBuildCollector(const TTabletId tabletId, const TInternalPathId pathId) override; virtual std::shared_ptr DoBuildLoader( const TVersionedIndex& versionedIndex, TGranuleMeta* granule, const std::shared_ptr& dsGroupSelector) override; diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp b/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp index 1a1d952b7f86..fec767f00ecc 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp @@ -7,7 +7,7 @@ void TCollector::DoAskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) { if (portions.size()) { NActors::TActivationContext::Send( - TabletActorId, std::make_unique(portions, callback, consumer)); + TabletActorId, std::make_unique(portions, callback, consumer, GetTabletId())); } } diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/collector.h b/ydb/core/tx/columnshard/data_accessor/local_db/collector.h index 0cb754014b1d..448b9b61a536 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/local_db/collector.h @@ -24,8 +24,8 @@ class TCollector: public IGranuleDataAccessor { virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) override; public: - TCollector(const TInternalPathId pathId, const ui64 maxSize, const NActors::TActorId& actorId) - : TBase(pathId) + TCollector(const TTabletId tabletId, const TInternalPathId pathId, const ui64 maxSize, const NActors::TActorId& actorId) + : TBase(tabletId, pathId) , TabletActorId(actorId) , AccessorsCache(maxSize) { } diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/manager.cpp b/ydb/core/tx/columnshard/data_accessor/local_db/manager.cpp index 7c80ca0fcb1a..205f545d056b 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/local_db/manager.cpp @@ -10,8 +10,8 @@ std::shared_ptr TManager::DoBuildLoader( return nullptr; } -std::unique_ptr TManager::DoBuildCollector(const TInternalPathId pathId) { - return std::make_unique(pathId, MemoryCacheSize, TabletActorId); +std::unique_ptr TManager::DoBuildCollector(const TTabletId tabletId, const TInternalPathId pathId) { + return std::make_unique(tabletId, pathId, MemoryCacheSize, TabletActorId); } } // namespace NKikimr::NOlap::NDataAccessorControl::NLocalDB diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/manager.h b/ydb/core/tx/columnshard/data_accessor/local_db/manager.h index ed8ad94f3a29..5aead386ebf6 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/manager.h +++ b/ydb/core/tx/columnshard/data_accessor/local_db/manager.h @@ -8,7 +8,7 @@ class TManager: public IMetadataMemoryManager { const NActors::TActorId TabletActorId; const ui64 MemoryCacheSize; const bool FetchOnStart = true; - virtual std::unique_ptr DoBuildCollector(const TInternalPathId pathId) override; + virtual std::unique_ptr DoBuildCollector(const TTabletId tabletId, const TInternalPathId pathId) override; virtual std::shared_ptr DoBuildLoader( const TVersionedIndex& versionedIndex, TGranuleMeta* granule, const std::shared_ptr& dsGroupSelector) override; diff --git a/ydb/core/tx/columnshard/data_accessor/manager.cpp b/ydb/core/tx/columnshard/data_accessor/manager.cpp index 47a21a772ab9..30c45dbd5de8 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/manager.cpp @@ -6,7 +6,7 @@ namespace NKikimr::NOlap::NDataAccessorControl { -void TLocalManager::DrainQueue() { +void TLocalManager::DrainQueue(const TTabletId tabletId) { std::optional lastPathId; IGranuleDataAccessor* lastDataAccessor = nullptr; TPositiveControlInteger countToFlight; @@ -18,7 +18,7 @@ void TLocalManager::DrainQueue() { PortionsAsk.pop_front(); if (!lastPathId || *lastPathId != p->GetPathId()) { lastPathId = p->GetPathId(); - auto it = Managers.find(p->GetPathId()); + auto it = Managers.find(makeManagerKey(tabletId, p->GetPathId())); if (it == Managers.end()) { lastDataAccessor = nullptr; } else { @@ -52,7 +52,7 @@ void TLocalManager::DrainQueue() { } } for (auto&& i : portionsToAsk) { - auto it = Managers.find(i.first); + auto it = Managers.find(makeManagerKey(tabletId, i.first)); AFL_VERIFY(it != Managers.end()); auto dataAnalyzed = it->second->AnalyzeData(i.second, "ANALYZE"); for (auto&& accessor : dataAnalyzed.GetCachedAccessors()) { @@ -78,7 +78,7 @@ void TLocalManager::DrainQueue() { Counters.QueueSize->Set(PortionsAsk.size()); } -void TLocalManager::DoAskData(const std::shared_ptr& request) { +void TLocalManager::DoAskData(const TTabletId tabletId, const std::shared_ptr& request) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString()); for (auto&& pathId : request->GetPathIds()) { auto portions = request->StartFetching(pathId); @@ -94,23 +94,25 @@ void TLocalManager::DoAskData(const std::shared_ptr& requ } } } - DrainQueue(); + DrainQueue(tabletId); } -void TLocalManager::DoRegisterController(std::unique_ptr&& controller, const bool update) { +void TLocalManager::DoRegisterController(std::unique_ptr&& controller, const TTabletId tabletId, const bool update) { + const auto it = Managers.find(makeManagerKey(tabletId, controller->GetPathId())); if (update) { - auto it = Managers.find(controller->GetPathId()); if (it != Managers.end()) { it->second = std::move(controller); } } else { - AFL_VERIFY(Managers.emplace(controller->GetPathId(), std::move(controller)).second); + if (it == Managers.end()) { + AFL_VERIFY(Managers.emplace(makeManagerKey(tabletId, controller->GetPathId()), std::move(controller)).second); + } } } -void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) { +void TLocalManager::DoAddPortion(const TTabletId tabletId, const TPortionDataAccessor& accessor) { { - auto it = Managers.find(accessor.GetPortionInfo().GetPathId()); + auto it = Managers.find(makeManagerKey(tabletId, accessor.GetPortionInfo().GetPathId())); AFL_VERIFY(it != Managers.end()); it->second->ModifyPortions({ accessor }, {}); } @@ -124,7 +126,7 @@ void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) { } RequestsByPortion.erase(it); } - DrainQueue(); + DrainQueue(tabletId); } } // namespace NKikimr::NOlap::NDataAccessorControl diff --git a/ydb/core/tx/columnshard/data_accessor/manager.h b/ydb/core/tx/columnshard/data_accessor/manager.h index b50aba19e783..be11b0833323 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.h +++ b/ydb/core/tx/columnshard/data_accessor/manager.h @@ -34,11 +34,11 @@ class TAccessorSignals: public NColumnShard::TCommonCountersOwner { class IDataAccessorsManager { private: - virtual void DoAskData(const std::shared_ptr& request) = 0; - virtual void DoRegisterController(std::unique_ptr&& controller, const bool update) = 0; - virtual void DoUnregisterController(const TInternalPathId pathId) = 0; - virtual void DoAddPortion(const TPortionDataAccessor& accessor) = 0; - virtual void DoRemovePortion(const TPortionInfo::TConstPtr& portion) = 0; + virtual void DoAskData(const TTabletId tabletId, const std::shared_ptr& request) = 0; + virtual void DoRegisterController(std::unique_ptr&& controller, const TTabletId tabletId, const bool update) = 0; + virtual void DoUnregisterController(const TTabletId tabletId, const TInternalPathId pathId) = 0; + virtual void DoAddPortion(const TTabletId tabletId, const TPortionDataAccessor& accessor) = 0; + virtual void DoRemovePortion(const TTabletId tabletId, const TPortionInfo::TConstPtr& portion) = 0; const NActors::TActorId TabletActorId; public: @@ -52,23 +52,23 @@ class IDataAccessorsManager { virtual ~IDataAccessorsManager() = default; - void AddPortion(const TPortionDataAccessor& accessor) { - DoAddPortion(accessor); + void AddPortion(const TTabletId tabletId, const TPortionDataAccessor& accessor) { + DoAddPortion(tabletId, accessor); } - void RemovePortion(const TPortionInfo::TConstPtr& portion) { - DoRemovePortion(portion); + void RemovePortion(const TTabletId tabletId, const TPortionInfo::TConstPtr& portion) { + DoRemovePortion(tabletId, portion); } - void AskData(const std::shared_ptr& request) { + void AskData(const TTabletId tabletId, const std::shared_ptr& request) { AFL_VERIFY(request); AFL_VERIFY(request->HasSubscriber()); - return DoAskData(request); + return DoAskData(tabletId, request); } - void RegisterController(std::unique_ptr&& controller, const bool update) { + void RegisterController(std::unique_ptr&& controller, const TTabletId tabletId, const bool update) { AFL_VERIFY(controller); - return DoRegisterController(std::move(controller), update); + return DoRegisterController(std::move(controller), tabletId, update); } - void UnregisterController(const TInternalPathId pathId) { - return DoUnregisterController(pathId); + void UnregisterController(const TTabletId tabletId, const TInternalPathId pathId) { + return DoUnregisterController(tabletId, pathId); } }; @@ -85,20 +85,20 @@ class TActorAccessorsManager: public IDataAccessorsManager { using TBase = IDataAccessorsManager; const NActors::TActorId ActorId; std::shared_ptr AccessorsCallback; - virtual void DoAskData(const std::shared_ptr& request) override { - NActors::TActivationContext::Send(ActorId, std::make_unique(request)); + virtual void DoAskData(const TTabletId tabletId, const std::shared_ptr& request) override { + NActors::TActivationContext::Send(ActorId, std::make_unique(tabletId, request)); } - virtual void DoRegisterController(std::unique_ptr&& controller, const bool update) override { - NActors::TActivationContext::Send(ActorId, std::make_unique(std::move(controller), update)); + virtual void DoRegisterController(std::unique_ptr&& controller, const TTabletId tabletId, const bool update) override { + NActors::TActivationContext::Send(ActorId, std::make_unique(std::move(controller), tabletId, update)); } - virtual void DoUnregisterController(const TInternalPathId pathId) override { - NActors::TActivationContext::Send(ActorId, std::make_unique(pathId)); + virtual void DoUnregisterController(TTabletId tabletId, const TInternalPathId pathId) override { + NActors::TActivationContext::Send(ActorId, std::make_unique(tabletId, pathId)); } - virtual void DoAddPortion(const TPortionDataAccessor& accessor) override { - NActors::TActivationContext::Send(ActorId, std::make_unique(accessor)); + virtual void DoAddPortion(const TTabletId tabletId, const TPortionDataAccessor& accessor) override { + NActors::TActivationContext::Send(ActorId, std::make_unique(tabletId, accessor)); } - virtual void DoRemovePortion(const TPortionInfo::TConstPtr& portion) override { - NActors::TActivationContext::Send(ActorId, std::make_unique(portion)); + virtual void DoRemovePortion(const TTabletId tabletId, const TPortionInfo::TConstPtr& portion) override { + NActors::TActivationContext::Send(ActorId, std::make_unique(tabletId, portion)); } public: @@ -113,7 +113,12 @@ class TActorAccessorsManager: public IDataAccessorsManager { class TLocalManager: public IDataAccessorsManager { private: using TBase = IDataAccessorsManager; - THashMap> Managers; + using TManagerKey = std::pair; + + static TManagerKey makeManagerKey(TTabletId tabletId, const TInternalPathId pathId) { + return std::make_pair(tabletId, pathId); + } + THashMap> Managers; THashMap>> RequestsByPortion; TAccessorSignals Counters; const std::shared_ptr AccessorCallback; @@ -137,16 +142,16 @@ class TLocalManager: public IDataAccessorsManager { std::deque PortionsAsk; TPositiveControlInteger PortionsAskInFlight; - void DrainQueue(); + void DrainQueue(const TTabletId tabletId); - virtual void DoAskData(const std::shared_ptr& request) override; - virtual void DoRegisterController(std::unique_ptr&& controller, const bool update) override; - virtual void DoUnregisterController(const TInternalPathId pathId) override { - AFL_VERIFY(Managers.erase(pathId)); + virtual void DoAskData(TTabletId tabletId, const std::shared_ptr& request) override; + virtual void DoRegisterController(std::unique_ptr&& controller, TTabletId tabletId, const bool update) override; + virtual void DoUnregisterController(TTabletId tabletId, const TInternalPathId pathId) override { + AFL_VERIFY(Managers.erase(makeManagerKey(tabletId, pathId))); } - virtual void DoAddPortion(const TPortionDataAccessor& accessor) override; - virtual void DoRemovePortion(const TPortionInfo::TConstPtr& portionInfo) override { - auto it = Managers.find(portionInfo->GetPathId()); + virtual void DoAddPortion(const TTabletId tabletId, const TPortionDataAccessor& accessor) override; + virtual void DoRemovePortion(TTabletId tabletId, const TPortionInfo::TConstPtr& portionInfo) override { + auto it = Managers.find(makeManagerKey(tabletId, portionInfo->GetPathId())); AFL_VERIFY(it != Managers.end()); it->second->ModifyPortions({}, { portionInfo->GetPortionId() }); } @@ -155,17 +160,18 @@ class TLocalManager: public IDataAccessorsManager { class TTestingCallback: public IAccessorCallback { private: std::weak_ptr Manager; - virtual void OnAccessorsFetched(std::vector&& accessors) override { + virtual void OnAccessorsFetched(TTabletId tabletId, std::vector&& accessors) override { auto mImpl = Manager.lock(); if (!mImpl) { return; } for (auto&& i : accessors) { - mImpl->AddPortion(i); + mImpl->AddPortion(tabletId, i); } } public: + explicit TTestingCallback() {} void InitManager(const std::weak_ptr& manager) { Manager = manager; } @@ -178,7 +184,7 @@ class TLocalManager: public IDataAccessorsManager { return result; } - TLocalManager(const std::shared_ptr& callback) + explicit TLocalManager(const std::shared_ptr& callback) : TBase(NActors::TActorId()) , AccessorCallback(callback) { } diff --git a/ydb/core/tx/columnshard/data_accessor/node_actor.cpp b/ydb/core/tx/columnshard/data_accessor/node_actor.cpp new file mode 100644 index 000000000000..08c0947ed03c --- /dev/null +++ b/ydb/core/tx/columnshard/data_accessor/node_actor.cpp @@ -0,0 +1,19 @@ +#include "node_actor.h" + +namespace NKikimr::NOlap::NDataAccessorControl { + +NActors::IActor* TNodeActor::CreateActor() { + return new TNodeActor(); +} + +void TNodeActor::Handle(TEvAskServiceDataAccessors::TPtr& ev) { + Manager->AskData(ev->Get()->GetTabletId(), ev->Get()->GetRequest()); +} + +void TNodeActor::Bootstrap() { + AccessorsCallback = std::make_shared(SelfId()); + Manager = std::make_shared(AccessorsCallback); + Become(&TThis::StateWait); +} + +} diff --git a/ydb/core/tx/columnshard/data_accessor/node_actor.h b/ydb/core/tx/columnshard/data_accessor/node_actor.h new file mode 100644 index 000000000000..95acd47878b7 --- /dev/null +++ b/ydb/core/tx/columnshard/data_accessor/node_actor.h @@ -0,0 +1,68 @@ +#pragma once +#include "events.h" +#include "manager.h" + +#include "abstract/collector.h" + +#include +#include + +namespace NKikimr::NOlap::NDataAccessorControl { + +class TNodeActor: public TActorBootstrapped { +private: + std::shared_ptr Manager; + + std::shared_ptr AccessorsCallback; + + void StartStopping() { + PassAway(); + } + + void Handle(TEvRegisterController::TPtr& ev) { + Manager->RegisterController(ev->Get()->ExtractController(), ev->Get()->GetTabletId(), ev->Get()->IsUpdate()); + } + void Handle(TEvUnregisterController::TPtr& ev) { + Manager->UnregisterController(ev->Get()->GetTabletId(), ev->Get()->GetPathId()); + } + void Handle(TEvAddPortion::TPtr& ev) { + for (auto&& a : ev->Get()->ExtractAccessors()) { + Manager->AddPortion(ev->Get()->GetTabletId(), std::move(a)); + } + } + void Handle(TEvRemovePortion::TPtr& ev) { + Manager->RemovePortion(ev->Get()->GetTabletId(), ev->Get()->GetPortion()); + } + void Handle(TEvAskServiceDataAccessors::TPtr& ev); + +public: + + static inline TActorId MakeActorId(ui32 nodeId) { + char x[12] = {'s', 'h', 'a', 'r', 'e', + 'd', 'm', 'e', 't', 'a', 'd', 't'}; + return TActorId(nodeId, TStringBuf(x, 12)); + } + + static NActors::IActor* CreateActor(); + + TNodeActor() = default; + ~TNodeActor() = default; + + void Bootstrap(); + + STFUNC(StateWait) { + const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("self_id", SelfId()); + switch (ev->GetTypeRewrite()) { + cFunc(NActors::TEvents::TEvPoison::EventType, StartStopping); + hFunc(TEvRegisterController, Handle); + hFunc(TEvUnregisterController, Handle); + hFunc(TEvAskServiceDataAccessors, Handle); + hFunc(TEvRemovePortion, Handle); + hFunc(TEvAddPortion, Handle); + default: + AFL_VERIFY(false); + } + } +}; + +} // namespace NKikimr::NOlap::NDataAccessorControl diff --git a/ydb/core/tx/columnshard/data_accessor/ya.make b/ydb/core/tx/columnshard/data_accessor/ya.make index f3212e91e74e..0355e0672a36 100644 --- a/ydb/core/tx/columnshard/data_accessor/ya.make +++ b/ydb/core/tx/columnshard/data_accessor/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( actor.cpp + node_actor.cpp events.cpp request.cpp manager.cpp diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 83520f7cc4ed..add8ee07f80c 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -30,7 +30,7 @@ namespace NKikimr::NOlap { TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema, const std::shared_ptr& counters) - : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) + : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager, (TTabletId)tabletId)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) , SchemaObjectsCache(schemaCache) @@ -45,7 +45,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::share TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema, const std::shared_ptr& counters) - : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) + : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager, (TTabletId)tabletId)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) , SchemaObjectsCache(schemaCache) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index 9f74bc79c656..51e180925871 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -190,7 +190,7 @@ bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr request = std::make_shared("PLAIN::" + step.GetName()); request->AddPortion(Portion); request->RegisterSubscriber(std::make_shared(step, sourcePtr)); - GetContext()->GetCommonContext()->GetDataAccessorsManager()->AskData(request); + GetContext()->GetCommonContext()->GetDataAccessorsManager()->AskData((NOlap::TTabletId)GetContext()->GetReadMetadata()->GetTabletId(), request); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 69cce115dfd2..9ab171841c81 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -191,6 +191,7 @@ class TPortionDataSource: public IDataSource { using TBase = IDataSource; const TPortionInfo::TConstPtr Portion; std::shared_ptr Schema; + TTabletId TabletId; void NeedFetchColumns(const std::set& columnIds, TBlobsAction& blobsAction, THashMap& nullBlocks, const std::shared_ptr& filter); @@ -299,7 +300,8 @@ class TPortionDataSource: public IDataSource { portion->RecordSnapshotMin(TSnapshot::Zero()), portion->RecordSnapshotMax(TSnapshot::Zero()), portion->GetRecordsCount(), portion->GetShardingVersionOptional(), portion->GetMeta().GetDeletionsCount()) , Portion(portion) - , Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) { + , Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) + , TabletId((NOlap::TTabletId)GetContext()->GetReadMetadata()->GetTabletId()){ } }; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index e075573a6ea8..d51efb74058a 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -416,7 +416,7 @@ bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptrAddPortion(Portion); request->SetColumnIds(GetContext()->GetAllUsageColumns()->GetColumnIds()); request->RegisterSubscriber(std::make_shared(step, sourcePtr)); - GetContext()->GetCommonContext()->GetDataAccessorsManager()->AskData(request); + GetContext()->GetCommonContext()->GetDataAccessorsManager()->AskData((NOlap::TTabletId)GetContext()->GetReadMetadata()->GetTabletId(), request); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h index c09a4f6d448b..802f14725150 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h @@ -5,6 +5,8 @@ #include #include +#include "ydb/core/tx/columnshard/engines/reader/abstract/read_context.h" + namespace NKikimr::NOlap::NReader::NSysView::NChunks { class TConstructor: public TStatScannerConstructor { @@ -111,7 +113,7 @@ class TStatsIterator: public NAbstract::TStatsIterator&& guard, const std::shared_ptr& /*selfPtr*/) override { Guard = std::move(guard); - AccessorsManager->AskData(std::move(Request)); + AccessorsManager->AskData((TTabletId)Context->GetReadMetadata()->GetTabletId(), std::move(Request)); return true; } virtual void DoOnAllocationImpossible(const TString& errorMessage) override; diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp index 651b83e8c7ff..bbeb21b82c4d 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp @@ -31,7 +31,7 @@ void TGranuleMeta::AppendPortion(const std::shared_ptr& info) { void TGranuleMeta::AppendPortion(const TPortionDataAccessor& info) { AppendPortion(info.MutablePortionInfoPtr()); - DataAccessorsManager->AddPortion(info); + DataAccessorsManager->AddPortion(TabletId, info); } bool TGranuleMeta::ErasePortion(const ui64 portion) { @@ -42,7 +42,7 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) { } else { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second->DebugString())("pathId", PathId); } - DataAccessorsManager->RemovePortion(it->second); + DataAccessorsManager->RemovePortion(TabletId, it->second); OnBeforeChangePortion(it->second); Portions.erase(it); OnAfterChangePortion(nullptr, nullptr); @@ -136,6 +136,7 @@ const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary( TGranuleMeta::TGranuleMeta( const TInternalPathId pathId, const TGranulesStorage& owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex) : PathId(pathId) + , TabletId(owner.GetTabletId()) , DataAccessorsManager(owner.GetDataAccessorsManager()) , Counters(counters) , PortionInfoGuard(owner.GetCounters().BuildPortionBlobsGuard()) @@ -175,7 +176,7 @@ void TGranuleMeta::BuildActualizationTasks(NActualizer::TTieringProcessContext& void TGranuleMeta::ResetAccessorsManager(const std::shared_ptr& constructor, const NDataAccessorControl::TManagerConstructionContext& context) { MetadataMemoryManager = constructor->Build(context).DetachResult(); - DataAccessorsManager->RegisterController(MetadataMemoryManager->BuildCollector(PathId), context.IsUpdate()); + DataAccessorsManager->RegisterController(MetadataMemoryManager->BuildCollector(TabletId, PathId), TabletId, context.IsUpdate()); } void TGranuleMeta::ResetOptimizer(const std::shared_ptr& constructor, @@ -265,7 +266,7 @@ bool TGranuleMeta::TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedI } for (auto&& [portionId, constructor] : constructors) { auto accessor = constructor.Build(false); - DataAccessorsManager->AddPortion(accessor); + DataAccessorsManager->AddPortion(TabletId, accessor); UpsertPortionOnLoad(accessor.MutablePortionInfoPtr()); } return true; @@ -274,7 +275,7 @@ bool TGranuleMeta::TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedI void TGranuleMeta::InsertPortionOnComplete(const TPortionDataAccessor& portion, IColumnEngine& /*engine*/) { AFL_VERIFY(InsertedPortions.emplace(portion.GetPortionInfo().GetInsertWriteIdVerified(), portion.MutablePortionInfoPtr()).second); AFL_VERIFY(InsertedAccessors.emplace(portion.GetPortionInfo().GetInsertWriteIdVerified(), portion).second); - DataAccessorsManager->AddPortion(portion); + DataAccessorsManager->AddPortion(TabletId, portion); } void TGranuleMeta::InsertPortionOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TPortionDataAccessor& portion) const { diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.h b/ydb/core/tx/columnshard/engines/storage/granule/granule.h index 0f2a02c135a3..6bc1ecba986a 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.h @@ -127,6 +127,7 @@ class TGranuleMeta: TNonCopyable { mutable bool AllowInsertionFlag = false; const TInternalPathId PathId; + const TTabletId TabletId; std::shared_ptr DataAccessorsManager; const NColumnShard::TGranuleDataCounters Counters; NColumnShard::TEngineLogsCounters::TPortionsInfoGuard PortionInfoGuard; @@ -179,7 +180,7 @@ class TGranuleMeta: TNonCopyable { std::unique_ptr BuildDataAccessor() { AFL_VERIFY(!DataAccessorConstructed); DataAccessorConstructed = true; - return MetadataMemoryManager->BuildCollector(PathId); + return MetadataMemoryManager->BuildCollector(TabletId, PathId); } void RefreshTiering(const std::optional& tiering) { @@ -308,7 +309,7 @@ class TGranuleMeta: TNonCopyable { } request->RegisterSubscriber(std::make_shared()); - DataAccessorsManager->AskData(request); + DataAccessorsManager->AskData(TabletId, request); } if (ActualizationIndex->IsStarted()) { RefreshScheme(); diff --git a/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp b/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp index 18f321470e89..89dbc35499f9 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp @@ -54,14 +54,14 @@ bool TGranuleIndexesReader::DoPrecharge(NTabletFlatExecutor::TTransactionContext return db.Table().Prefix(Self->GetPathId().GetRawValue()).Select().IsReady(); } -bool TGranuleFinishAccessorsLoading::DoExecute(NTabletFlatExecutor::TTransactionContext& /*txc*/, const TActorContext& /*ctx*/) { +bool TGranuleFinishAccessorsLoading::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { THashMap constructors = Context->ExtractConstructors(); AFL_VERIFY(Self->GetPortions().size() == constructors.size()); for (auto&& i : Self->GetPortions()) { auto it = constructors.find(i.first); AFL_VERIFY(it != constructors.end()); auto accessor = TPortionAccessorConstructor::BuildForLoading(i.second, std::move(it->second.MutableRecords()), std::move(it->second.MutableIndexes())); - Self->GetDataAccessorsManager()->AddPortion(accessor); + Self->GetDataAccessorsManager()->AddPortion((NOlap::TTabletId)txc.Tablet, accessor); } return true; } diff --git a/ydb/core/tx/columnshard/engines/storage/granule/storage.h b/ydb/core/tx/columnshard/engines/storage/granule/storage.h index b22f9f0c089e..49d0693a350e 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/storage.h +++ b/ydb/core/tx/columnshard/engines/storage/granule/storage.h @@ -97,6 +97,7 @@ class TGranulesStat { class TGranulesStorage { private: const NColumnShard::TEngineLogsCounters Counters; + const TTabletId TabletId; const std::shared_ptr DataAccessorsManager; std::shared_ptr StoragesManager; THashMap> Tables; // pathId into Granule that equal to Table @@ -121,17 +122,18 @@ class TGranulesStorage { TGranulesStorage(const NColumnShard::TEngineLogsCounters counters, const std::shared_ptr& dataAccessorsManager, - const std::shared_ptr& storagesManager) + const std::shared_ptr& storagesManager, + const TTabletId tabletId) : Counters(counters) + , TabletId(tabletId) , DataAccessorsManager(dataAccessorsManager) - , StoragesManager(storagesManager) - , Stats(std::make_shared(Counters)) { + , StoragesManager(storagesManager), Stats(std::make_shared(Counters)) { AFL_VERIFY(DataAccessorsManager); AFL_VERIFY(StoragesManager); } void FetchDataAccessors(const std::shared_ptr& request) const { - DataAccessorsManager->AskData(request); + DataAccessorsManager->AskData(TabletId, request); } const std::shared_ptr& GetStats() const { @@ -153,7 +155,7 @@ class TGranulesStorage { if (!it->second->IsErasable()) { return false; } - DataAccessorsManager->UnregisterController(pathId); + DataAccessorsManager->UnregisterController(TabletId, pathId); Tables.erase(it); return true; } @@ -214,6 +216,8 @@ class TGranulesStorage { return Counters; } + TTabletId GetTabletId() const { return TabletId;} + std::shared_ptr GetGranuleForCompaction(const std::shared_ptr& locksManager) const; std::optional GetCompactionPriority(const std::shared_ptr& locksManager, const std::set& pathIds = Default>(), const std::optional waitingPriority = std::nullopt,