diff --git a/ydb/core/driver_lib/run/config_helpers.cpp b/ydb/core/driver_lib/run/config_helpers.cpp new file mode 100644 index 000000000000..bb2753759121 --- /dev/null +++ b/ydb/core/driver_lib/run/config_helpers.cpp @@ -0,0 +1,115 @@ +#include "config_helpers.h" + +#include + + +namespace NKikimr { + +namespace NActorSystemConfigHelpers { + +namespace { + +template +static TCpuMask ParseAffinity(const TConfig& cfg) { + TCpuMask result; + if (cfg.GetCpuList()) { + result = TCpuMask(cfg.GetCpuList()); + } else if (cfg.GetX().size() > 0) { + result = TCpuMask(cfg.GetX().data(), cfg.GetX().size()); + } else { // use all processors + TAffinity available; + available.Current(); + result = available; + } + if (cfg.GetExcludeCpuList()) { + result = result - TCpuMask(cfg.GetExcludeCpuList()); + } + return result; +} + +TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) { + return systemConfig.HasSelfPingInterval() + ? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval()) + : TDuration::MilliSeconds(10); +} + +NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) { + switch (profile) { + case NKikimrConfig::TActorSystemConfig::DEFAULT: + return NActors::EASProfile::Default; + case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION: + return NActors::EASProfile::LowCpuConsumption; + case NKikimrConfig::TActorSystemConfig::LOW_LATENCY: + return NActors::EASProfile::LowLatency; + } +} + +} // anonymous namespace + +void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, NMonitoring::TDynamicCounterPtr counters) { + switch (poolConfig.GetType()) { + case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: { + NActors::TBasicExecutorPoolConfig basic; + basic.PoolId = poolId; + basic.PoolName = poolConfig.GetName(); + if (poolConfig.HasMaxAvgPingDeviation() && counters) { + auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName); + auto &poolInfo = cpuManager.PingInfoByPool[poolId]; + poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false); + poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false); + TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation()); + poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds(); + } + basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads()); + basic.SpinThreshold = poolConfig.GetSpinThreshold(); + basic.Affinity = ParseAffinity(poolConfig.GetAffinity()); + basic.RealtimePriority = poolConfig.GetRealtimePriority(); + basic.HasSharedThread = poolConfig.GetHasSharedThread(); + if (poolConfig.HasTimePerMailboxMicroSecs()) { + basic.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs()); + } else if (systemConfig.HasTimePerMailboxMicroSecs()) { + basic.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs()); + } + if (poolConfig.HasEventsPerMailbox()) { + basic.EventsPerMailbox = poolConfig.GetEventsPerMailbox(); + } else if (systemConfig.HasEventsPerMailbox()) { + basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox(); + } + basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile()); + Y_ABORT_UNLESS(basic.EventsPerMailbox != 0); + basic.MinThreadCount = poolConfig.GetMinThreads(); + basic.MaxThreadCount = poolConfig.GetMaxThreads(); + basic.DefaultThreadCount = poolConfig.GetThreads(); + basic.Priority = poolConfig.GetPriority(); + cpuManager.Basic.emplace_back(std::move(basic)); + break; + } + + case NKikimrConfig::TActorSystemConfig::TExecutor::IO: { + NActors::TIOExecutorPoolConfig io; + io.PoolId = poolId; + io.PoolName = poolConfig.GetName(); + io.Threads = poolConfig.GetThreads(); + io.Affinity = ParseAffinity(poolConfig.GetAffinity()); + cpuManager.IO.emplace_back(std::move(io)); + break; + } + + default: + Y_ABORT(); + } +} + +NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler& config) { + const ui64 resolution = config.HasResolution() ? config.GetResolution() : 1024; + Y_DEBUG_ABORT_UNLESS((resolution & (resolution - 1)) == 0); // resolution must be power of 2 + const ui64 spinThreshold = config.HasSpinThreshold() ? config.GetSpinThreshold() : 0; + const ui64 progressThreshold = config.HasProgressThreshold() ? config.GetProgressThreshold() : 10000; + const bool useSchedulerActor = config.HasUseSchedulerActor() ? config.GetUseSchedulerActor() : false; + + return NActors::TSchedulerConfig(resolution, spinThreshold, progressThreshold, useSchedulerActor); +} + +} // namespace NActorSystemConfigHelpers + +} // namespace NKikimr diff --git a/ydb/core/driver_lib/run/config_helpers.h b/ydb/core/driver_lib/run/config_helpers.h new file mode 100644 index 000000000000..d38e416b6f5b --- /dev/null +++ b/ydb/core/driver_lib/run/config_helpers.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +#include + + +namespace NKikimr { + +namespace NActorSystemConfigHelpers { + +void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, NMonitoring::TDynamicCounterPtr counters); + +NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler& config); + +} // namespace NActorSystemConfigHelpers + +} // namespace NKikimr diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index c31d402ddbec..ecb69c001c91 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1,4 +1,5 @@ #include "auto_config_initializer.h" +#include "config_helpers.h" #include "config.h" #include "kikimr_services_initializers.h" #include "service_initializer.h" @@ -277,42 +278,6 @@ IKikimrServicesInitializer::IKikimrServicesInitializer(const TKikimrRunConfig& r // TBasicServicesInitializer -template -static TCpuMask ParseAffinity(const TConfig& cfg) { - TCpuMask result; - if (cfg.GetCpuList()) { - result = TCpuMask(cfg.GetCpuList()); - } else if (cfg.GetX().size() > 0) { - result = TCpuMask(cfg.GetX().data(), cfg.GetX().size()); - } else { // use all processors - TAffinity available; - available.Current(); - result = available; - } - if (cfg.GetExcludeCpuList()) { - result = result - TCpuMask(cfg.GetExcludeCpuList()); - } - return result; -} - -TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) { - return systemConfig.HasSelfPingInterval() - ? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval()) - : TDuration::MilliSeconds(10); -} - - -NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) { - switch (profile) { - case NKikimrConfig::TActorSystemConfig::DEFAULT: - return NActors::EASProfile::Default; - case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION: - return NActors::EASProfile::LowCpuConsumption; - case NKikimrConfig::TActorSystemConfig::LOW_LATENCY: - return NActors::EASProfile::LowLatency; - } -} - void AddExecutorPool( TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, @@ -321,55 +286,7 @@ void AddExecutorPool( const NKikimr::TAppData* appData) { const auto counters = GetServiceCounters(appData->Counters, "utils"); - switch (poolConfig.GetType()) { - case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: { - TBasicExecutorPoolConfig basic; - basic.PoolId = poolId; - basic.PoolName = poolConfig.GetName(); - if (poolConfig.HasMaxAvgPingDeviation()) { - auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName); - auto &poolInfo = cpuManager.PingInfoByPool[poolId]; - poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false); - poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false); - TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation()); - poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds(); - } - basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads()); - basic.SpinThreshold = poolConfig.GetSpinThreshold(); - basic.Affinity = ParseAffinity(poolConfig.GetAffinity()); - basic.RealtimePriority = poolConfig.GetRealtimePriority(); - basic.HasSharedThread = poolConfig.GetHasSharedThread(); - if (poolConfig.HasTimePerMailboxMicroSecs()) { - basic.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs()); - } else if (systemConfig.HasTimePerMailboxMicroSecs()) { - basic.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs()); - } - if (poolConfig.HasEventsPerMailbox()) { - basic.EventsPerMailbox = poolConfig.GetEventsPerMailbox(); - } else if (systemConfig.HasEventsPerMailbox()) { - basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox(); - } - basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile()); - Y_ABORT_UNLESS(basic.EventsPerMailbox != 0); - basic.MinThreadCount = poolConfig.GetMinThreads(); - basic.MaxThreadCount = poolConfig.GetMaxThreads(); - basic.DefaultThreadCount = poolConfig.GetThreads(); - basic.Priority = poolConfig.GetPriority(); - cpuManager.Basic.emplace_back(std::move(basic)); - break; - } - case NKikimrConfig::TActorSystemConfig::TExecutor::IO: { - TIOExecutorPoolConfig io; - io.PoolId = poolId; - io.PoolName = poolConfig.GetName(); - io.Threads = poolConfig.GetThreads(); - io.Affinity = ParseAffinity(poolConfig.GetAffinity()); - cpuManager.IO.emplace_back(std::move(io)); - break; - } - default: - Y_ABORT(); - } + NActorSystemConfigHelpers::AddExecutorPool(cpuManager, poolConfig, systemConfig, poolId, counters); } static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config, @@ -383,16 +300,6 @@ static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSyste return cpuManager; } -static TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler &config) { - const ui64 resolution = config.HasResolution() ? config.GetResolution() : 1024; - Y_DEBUG_ABORT_UNLESS((resolution & (resolution - 1)) == 0); // resolution must be power of 2 - const ui64 spinThreshold = config.HasSpinThreshold() ? config.GetSpinThreshold() : 0; - const ui64 progressThreshold = config.HasProgressThreshold() ? config.GetProgressThreshold() : 10000; - const bool useSchedulerActor = config.HasUseSchedulerActor() ? config.GetUseSchedulerActor() : false; - - return TSchedulerConfig(resolution, spinThreshold, progressThreshold, useSchedulerActor); -} - static bool IsServiceInitialized(NActors::TActorSystemSetup* setup, TActorId service) { for (auto &pr : setup->LocalServices) @@ -601,7 +508,7 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s setup->CpuManager = CreateCpuManagerConfig(systemConfig, appData); setup->MonitorStuckActors = systemConfig.GetMonitorStuckActors(); - auto schedulerConfig = CreateSchedulerConfig(systemConfig.GetScheduler()); + auto schedulerConfig = NActorSystemConfigHelpers::CreateSchedulerConfig(systemConfig.GetScheduler()); schedulerConfig.MonCounters = GetServiceCounters(counters, "utils"); setup->Scheduler.Reset(CreateSchedulerThread(schedulerConfig)); setup->LocalServices.emplace_back(MakeIoDispatcherActorId(), TActorSetupCmd(CreateIoDispatcherActor( @@ -1265,7 +1172,7 @@ void TSchedulerActorInitializer::InitializeServices( NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { auto& systemConfig = Config.GetActorSystemConfig(); - NActors::IActor *schedulerActor = CreateSchedulerActor(CreateSchedulerConfig(systemConfig.GetScheduler())); + NActors::IActor *schedulerActor = CreateSchedulerActor(NActorSystemConfigHelpers::CreateSchedulerConfig(systemConfig.GetScheduler())); if (schedulerActor) { NActors::TActorSetupCmd schedulerActorCmd(schedulerActor, NActors::TMailboxType::ReadAsFilled, appData->SystemPoolId); setup->LocalServices.emplace_back(MakeSchedulerActorId(), std::move(schedulerActorCmd)); diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index b05dc136cd5e..b83ac0465eb7 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -4,6 +4,7 @@ SRCS( auto_config_initializer.cpp config.cpp config.h + config_helpers.cpp config_parser.cpp config_parser.h driver.h diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index 6fc7cfbaabda..9ced093244d1 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -49,6 +50,11 @@ namespace NActors { NeedStatsCollectors = true; } + void TTestActorRuntime::SetupActorSystemConfig(const TActorSystemSetupConfig& config, const TActorSystemPools& pools) { + ActorSystemSetupConfig = config; + ActorSystemPools = pools; + } + TTestActorRuntime::TTestActorRuntime(THeSingleSystemEnv d) : TPortManager(false) , TTestActorRuntimeBase{d} @@ -131,7 +137,7 @@ namespace NActors { node->ActorSystem = MakeActorSystem(nodeIndex, node); node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor")); } else { - node->AppData0.reset(new NKikimr::TAppData(0, 1, 2, 3, { }, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr)); + node->AppData0.reset(new NKikimr::TAppData(ActorSystemPools.SystemPoolId, ActorSystemPools.UserPoolId, ActorSystemPools.IOPoolId, ActorSystemPools.BatchPoolId, ActorSystemPools.ServicePools, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr)); node->ActorSystem = MakeActorSystem(nodeIndex, node); } node->LogSettings->MessagePrefix = " node " + ToString(nodeId); @@ -219,6 +225,18 @@ namespace NActors { } void TTestActorRuntime::InitActorSystemSetup(TActorSystemSetup& setup, TNodeDataBase* node) { + if (ActorSystemSetupConfig) { + setup.Executors.Reset(); + setup.ExecutorsCount = 0; + + setup.CpuManager = ActorSystemSetupConfig->CpuManagerConfig; + setup.MonitorStuckActors = ActorSystemSetupConfig->MonitorStuckActors; + + auto schedulerConfig = ActorSystemSetupConfig->SchedulerConfig; + schedulerConfig.MonCounters = NKikimr::GetServiceCounters(node->DynamicCounters, "utils"); + setup.Scheduler.Reset(CreateSchedulerThread(schedulerConfig)); + } + if (NeedMonitoring && NeedStatsCollectors) { NActors::IActor* statsCollector = NKikimr::CreateStatsCollector(1, setup, node->DynamicCounters); setup.LocalServices.push_back({ diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h index 149180536710..7198bc03b6d9 100644 --- a/ydb/core/testlib/actors/test_runtime.h +++ b/ydb/core/testlib/actors/test_runtime.h @@ -53,6 +53,20 @@ namespace NActors { std::vector> Icb; }; + struct TActorSystemSetupConfig { + TCpuManagerConfig CpuManagerConfig; + TSchedulerConfig SchedulerConfig; + bool MonitorStuckActors = false; + }; + + struct TActorSystemPools { + ui32 SystemPoolId = 0; + ui32 UserPoolId = 1; + ui32 IOPoolId = 2; + ui32 BatchPoolId = 3; + TMap ServicePools = {}; + }; + TTestActorRuntime(THeSingleSystemEnv d); TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount); @@ -63,6 +77,7 @@ namespace NActors { void AddAppDataInit(std::function callback); virtual void Initialize(TEgg); void SetupStatsCollectors(); + void SetupActorSystemConfig(const TActorSystemSetupConfig& config, const TActorSystemPools& pools); ui16 GetMonPort(ui32 nodeIndex = 0) const; @@ -125,5 +140,7 @@ namespace NActors { TActorId SleepEdgeActor; TVector> AppDataInit_; bool NeedStatsCollectors = false; + std::optional ActorSystemSetupConfig; + TActorSystemPools ActorSystemPools; }; } // namespace NActors diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 90d7b6d89657..3349b4e1ce8a 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include #include @@ -234,6 +236,8 @@ namespace Tests { Runtime->SetupMonitoring(Settings->MonitoringPortOffset, Settings->MonitoringTypeAsync); Runtime->SetLogBackend(Settings->LogBackend); + SetupActorSystemConfig(); + Runtime->AddAppDataInit([this](ui32 nodeIdx, NKikimr::TAppData& appData) { Y_UNUSED(nodeIdx); @@ -291,6 +295,34 @@ namespace Tests { SetupStorage(); } + void TServer::SetupActorSystemConfig() { + if (!Settings->AppConfig->HasActorSystemConfig()) { + return; + } + + auto actorSystemConfig = Settings->AppConfig->GetActorSystemConfig(); + const bool useAutoConfig = actorSystemConfig.HasUseAutoConfig() && actorSystemConfig.GetUseAutoConfig(); + if (useAutoConfig) { + NAutoConfigInitializer::ApplyAutoConfig(&actorSystemConfig); + } + + TCpuManagerConfig cpuManager; + for (int poolId = 0; poolId < actorSystemConfig.GetExecutor().size(); poolId++) { + NActorSystemConfigHelpers::AddExecutorPool(cpuManager, actorSystemConfig.GetExecutor(poolId), actorSystemConfig, poolId, nullptr); + } + + const NAutoConfigInitializer::TASPools pools = NAutoConfigInitializer::GetASPools(actorSystemConfig, useAutoConfig); + + Runtime->SetupActorSystemConfig(TTestActorRuntime::TActorSystemSetupConfig{ + .CpuManagerConfig = cpuManager, + .SchedulerConfig = NActorSystemConfigHelpers::CreateSchedulerConfig(actorSystemConfig.GetScheduler()), + .MonitorStuckActors = actorSystemConfig.GetMonitorStuckActors() + }, TTestActorRuntime::TActorSystemPools{ + pools.SystemPoolId, pools.UserPoolId, pools.IOPoolId, pools.BatchPoolId, + NAutoConfigInitializer::GetServicePools(actorSystemConfig, useAutoConfig) + }); + } + void TServer::SetupMessageBus(ui16 port) { if (port) { Bus = NBus::CreateMessageQueue(NBus::TBusQueueConfig()); @@ -308,7 +340,9 @@ namespace Tests { auto system(Runtime->GetAnyNodeActorSystem()); - Cerr << "TServer::EnableGrpc on GrpcPort " << options.Port << ", node " << system->NodeId << Endl; + if (Settings->Verbose) { + Cerr << "TServer::EnableGrpc on GrpcPort " << options.Port << ", node " << system->NodeId << Endl; + } const size_t proxyCount = Max(ui32{1}, Settings->AppConfig->GetGRpcConfig().GetGRpcProxyCount()); TVector grpcRequestProxies; diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 0205ffc989c2..69de2759a9ba 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -265,6 +265,7 @@ namespace Tests { protected: void SetupStorage(); + void SetupActorSystemConfig(); void SetupMessageBus(ui16 port); void SetupDomains(TAppPrepare&); void CreateBootstrapTablets(); diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp index 88564e1a0ca5..240db79e7853 100644 --- a/ydb/library/actors/testlib/test_runtime.cpp +++ b/ydb/library/actors/testlib/test_runtime.cpp @@ -905,7 +905,7 @@ namespace NActors { TGuard guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { - Y_ABORT_UNLESS(poolId < node->ExecutorPools.size()); + Y_ABORT_UNLESS(node->ExecutorPools.contains(poolId)); return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); } @@ -973,7 +973,7 @@ namespace NActors { TGuard guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { - Y_ABORT_UNLESS(poolId < node->ExecutorPools.size()); + Y_ABORT_UNLESS(node->ExecutorPools.contains(poolId)); return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); } @@ -1718,7 +1718,7 @@ namespace NActors { THolder TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { auto setup = MakeActorSystemSetup(nodeIndex, node); - node->ExecutorPools.resize(setup->ExecutorsCount); + node->ExecutorPools.reserve(setup->ExecutorsCount); for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { IExecutorPool* executor = setup->Executors[i].Get(); node->ExecutorPools[i] = executor; @@ -1786,7 +1786,18 @@ namespace NActors { setup->LocalServices.push_back(std::move(loggerActorPair)); } - return THolder(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); + auto actorSystem = THolder(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); + + if (node->ExecutorPools.empty()) { + // Initialize pools from actor system (except IO pool) + const auto& pools = actorSystem->GetBasicExecutorPools(); + node->ExecutorPools.reserve(pools.size()); + for (IExecutorPool* pool : pools) { + node->ExecutorPools[pool->PoolId] = pool; + } + } + + return actorSystem; } TActorSystem* TTestActorRuntimeBase::SingleSys() const { diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h index 587ebb49de75..f462efc45fd3 100644 --- a/ydb/library/actors/testlib/test_runtime.h +++ b/ydb/library/actors/testlib/test_runtime.h @@ -701,7 +701,7 @@ namespace NActors { std::shared_ptr AppData0; THolder ActorSystem; THolder SchedulerPool; - TVector ExecutorPools; + THashMap ExecutorPools; THolder ExecutorThread; std::unique_ptr Harmonizer; }; diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 52769286de32..454b003ba8bb 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -32,6 +32,7 @@ struct TExecutionOptions { std::vector ScriptQueries; TString SchemeQuery; + bool UseTemplates = false; ui32 LoopCount = 1; TDuration LoopDelay; @@ -70,8 +71,13 @@ struct TExecutionOptions { } NKqpRun::TRequestOptions GetSchemeQueryOptions() const { + TString sql = SchemeQuery; + if (UseTemplates) { + ReplaceYqlTokenTemplate(sql); + } + return { - .Query = SchemeQuery, + .Query = sql, .Action = NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE, .TraceId = DefaultTraceId, .PoolId = "", @@ -79,10 +85,17 @@ struct TExecutionOptions { }; } - NKqpRun::TRequestOptions GetScriptQueryOptions(size_t index, TInstant startTime) const { + NKqpRun::TRequestOptions GetScriptQueryOptions(size_t index, size_t queryId, TInstant startTime) const { Y_ABORT_UNLESS(index < ScriptQueries.size()); + + TString sql = ScriptQueries[index]; + if (UseTemplates) { + ReplaceYqlTokenTemplate(sql); + SubstGlobal(sql, "${QUERY_ID}", ToString(queryId)); + } + return { - .Query = ScriptQueries[index], + .Query = sql, .Action = GetScriptQueryAction(index), .TraceId = TStringBuilder() << GetValue(index, TraceIds, DefaultTraceId) << "-" << startTime.ToString(), .PoolId = GetValue(index, PoolIds, TString()), @@ -98,6 +111,15 @@ struct TExecutionOptions { } return values[std::min(index, values.size() - 1)]; } + + static void ReplaceYqlTokenTemplate(TString& sql) { + const TString variableName = TStringBuilder() << "${" << NKqpRun::YQL_TOKEN_VARIABLE << "}"; + if (const TString& yqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE)) { + SubstGlobal(sql, variableName, yqlToken); + } else if (sql.Contains(variableName)) { + ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable\n"; + } + } }; @@ -134,7 +156,7 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp switch (executionCase) { case TExecutionOptions::EExecutionCase::GenericScript: - if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(id, startTime))) { + if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(id, queryId, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; } Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl; @@ -150,19 +172,19 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp break; case TExecutionOptions::EExecutionCase::GenericQuery: - if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(id, startTime))) { + if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(id, queryId, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; } break; case TExecutionOptions::EExecutionCase::YqlScript: - if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(id, startTime))) { + if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(id, queryId, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } break; case TExecutionOptions::EExecutionCase::AsyncQuery: - runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(id, startTime)); + runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(id, queryId, startTime)); break; } } @@ -260,14 +282,6 @@ class TMain : public TMainClassArgs { return TFileInput(file).ReadAll(); } - static void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { - TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; - for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { - query.replace(position, variableTemplate.size(), variableValue); - position += variableValue.size(); - } - } - static IOutputStream* GetDefaultOutput(const TString& file) { if (file == "-") { return &Cout; @@ -315,13 +329,15 @@ class TMain : public TMainClassArgs { .RequiredArgument("file") .Handler1([this](const NLastGetopt::TOptsParser* option) { ExecutionOptions.SchemeQuery = LoadFile(option->CurVal()); - ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, YqlToken, ExecutionOptions.SchemeQuery); }); options.AddLongOption('p', "script-query", "Script query to execute (typically DML query)") .RequiredArgument("file") .Handler1([this](const NLastGetopt::TOptsParser* option) { ExecutionOptions.ScriptQueries.emplace_back(LoadFile(option->CurVal())); }); + options.AddLongOption("templates", "Enable templates for -s and -p queries, such as ${YQL_TOKEN} and ${QUERY_ID}") + .NoArgument() + .SetFlag(&ExecutionOptions.UseTemplates); options.AddLongOption('t', "table", "File with input table (can be used by YT with -E flag), table@file") .RequiredArgument("table@file") diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index cd5c89c8f6de..6a1f1cf7570d 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -199,7 +199,7 @@ class TYdbSetup::TImpl { void WaitResourcesPublishing() const { auto promise = NThreading::NewPromise(); - GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount)); + GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount), 0, GetRuntime()->GetAppData().SystemPoolId); try { promise.GetFuture().GetValue(Settings_.InitializationTimeout); @@ -247,7 +247,7 @@ class TYdbSetup::TImpl { TQueryResponse QueryRequest(const TRequestOptions& query, TProgressCallback progressCallback) const { auto request = GetQueryRequest(query); auto promise = NThreading::NewPromise(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback)); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback), 0, GetRuntime()->GetAppData().UserPoolId); return promise.GetFuture().GetValueSync(); } @@ -275,7 +275,7 @@ class TYdbSetup::TImpl { auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); NActors::IActor* fetchActor = NKikimr::NKqp::CreateGetScriptExecutionResultActor(edgeActor, Settings_.DomainName, executionId, resultSetId, 0, rowsLimit, sizeLimit, TInstant::Max()); - GetRuntime()->Register(fetchActor, nodeIndex); + GetRuntime()->Register(fetchActor, nodeIndex, GetRuntime()->GetAppData(nodeIndex).UserPoolId); return GetRuntime()->GrabEdgeEvent(edgeActor); } @@ -289,7 +289,7 @@ class TYdbSetup::TImpl { void QueryRequestAsync(const TRequestOptions& query) { if (!AsyncQueryRunnerActorId_) { - AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.AsyncQueriesSettings)); + AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.AsyncQueriesSettings), 0, GetRuntime()->GetAppData().UserPoolId); } auto request = GetQueryRequest(query);