From fc46320c690e7c68332897990fc84784c4036bfb Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Tue, 13 Aug 2024 05:52:01 +0000 Subject: [PATCH 1/7] Added actor system config into testlib --- ydb/core/driver_lib/run/config_helpers.cpp | 115 ++++++++++++++++++ ydb/core/driver_lib/run/config_helpers.h | 18 +++ .../run/kikimr_services_initializers.cpp | 101 +-------------- ydb/core/driver_lib/run/ya.make | 1 + ydb/core/testlib/actors/test_runtime.cpp | 17 +++ ydb/core/testlib/actors/test_runtime.h | 12 ++ ydb/core/testlib/test_client.cpp | 26 ++++ ydb/core/testlib/test_client.h | 1 + ydb/library/actors/testlib/test_runtime.cpp | 18 +-- ydb/tests/tools/kqprun/kqprun.cpp | 52 +++++--- 10 files changed, 240 insertions(+), 121 deletions(-) create mode 100644 ydb/core/driver_lib/run/config_helpers.cpp create mode 100644 ydb/core/driver_lib/run/config_helpers.h diff --git a/ydb/core/driver_lib/run/config_helpers.cpp b/ydb/core/driver_lib/run/config_helpers.cpp new file mode 100644 index 000000000000..bb2753759121 --- /dev/null +++ b/ydb/core/driver_lib/run/config_helpers.cpp @@ -0,0 +1,115 @@ +#include "config_helpers.h" + +#include + + +namespace NKikimr { + +namespace NActorSystemConfigHelpers { + +namespace { + +template +static TCpuMask ParseAffinity(const TConfig& cfg) { + TCpuMask result; + if (cfg.GetCpuList()) { + result = TCpuMask(cfg.GetCpuList()); + } else if (cfg.GetX().size() > 0) { + result = TCpuMask(cfg.GetX().data(), cfg.GetX().size()); + } else { // use all processors + TAffinity available; + available.Current(); + result = available; + } + if (cfg.GetExcludeCpuList()) { + result = result - TCpuMask(cfg.GetExcludeCpuList()); + } + return result; +} + +TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) { + return systemConfig.HasSelfPingInterval() + ? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval()) + : TDuration::MilliSeconds(10); +} + +NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) { + switch (profile) { + case NKikimrConfig::TActorSystemConfig::DEFAULT: + return NActors::EASProfile::Default; + case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION: + return NActors::EASProfile::LowCpuConsumption; + case NKikimrConfig::TActorSystemConfig::LOW_LATENCY: + return NActors::EASProfile::LowLatency; + } +} + +} // anonymous namespace + +void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, NMonitoring::TDynamicCounterPtr counters) { + switch (poolConfig.GetType()) { + case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: { + NActors::TBasicExecutorPoolConfig basic; + basic.PoolId = poolId; + basic.PoolName = poolConfig.GetName(); + if (poolConfig.HasMaxAvgPingDeviation() && counters) { + auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName); + auto &poolInfo = cpuManager.PingInfoByPool[poolId]; + poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false); + poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false); + TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation()); + poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds(); + } + basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads()); + basic.SpinThreshold = poolConfig.GetSpinThreshold(); + basic.Affinity = ParseAffinity(poolConfig.GetAffinity()); + basic.RealtimePriority = poolConfig.GetRealtimePriority(); + basic.HasSharedThread = poolConfig.GetHasSharedThread(); + if (poolConfig.HasTimePerMailboxMicroSecs()) { + basic.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs()); + } else if (systemConfig.HasTimePerMailboxMicroSecs()) { + basic.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs()); + } + if (poolConfig.HasEventsPerMailbox()) { + basic.EventsPerMailbox = poolConfig.GetEventsPerMailbox(); + } else if (systemConfig.HasEventsPerMailbox()) { + basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox(); + } + basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile()); + Y_ABORT_UNLESS(basic.EventsPerMailbox != 0); + basic.MinThreadCount = poolConfig.GetMinThreads(); + basic.MaxThreadCount = poolConfig.GetMaxThreads(); + basic.DefaultThreadCount = poolConfig.GetThreads(); + basic.Priority = poolConfig.GetPriority(); + cpuManager.Basic.emplace_back(std::move(basic)); + break; + } + + case NKikimrConfig::TActorSystemConfig::TExecutor::IO: { + NActors::TIOExecutorPoolConfig io; + io.PoolId = poolId; + io.PoolName = poolConfig.GetName(); + io.Threads = poolConfig.GetThreads(); + io.Affinity = ParseAffinity(poolConfig.GetAffinity()); + cpuManager.IO.emplace_back(std::move(io)); + break; + } + + default: + Y_ABORT(); + } +} + +NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler& config) { + const ui64 resolution = config.HasResolution() ? config.GetResolution() : 1024; + Y_DEBUG_ABORT_UNLESS((resolution & (resolution - 1)) == 0); // resolution must be power of 2 + const ui64 spinThreshold = config.HasSpinThreshold() ? config.GetSpinThreshold() : 0; + const ui64 progressThreshold = config.HasProgressThreshold() ? config.GetProgressThreshold() : 10000; + const bool useSchedulerActor = config.HasUseSchedulerActor() ? config.GetUseSchedulerActor() : false; + + return NActors::TSchedulerConfig(resolution, spinThreshold, progressThreshold, useSchedulerActor); +} + +} // namespace NActorSystemConfigHelpers + +} // namespace NKikimr diff --git a/ydb/core/driver_lib/run/config_helpers.h b/ydb/core/driver_lib/run/config_helpers.h new file mode 100644 index 000000000000..d38e416b6f5b --- /dev/null +++ b/ydb/core/driver_lib/run/config_helpers.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +#include + + +namespace NKikimr { + +namespace NActorSystemConfigHelpers { + +void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, NMonitoring::TDynamicCounterPtr counters); + +NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler& config); + +} // namespace NActorSystemConfigHelpers + +} // namespace NKikimr diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index c31d402ddbec..ecb69c001c91 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1,4 +1,5 @@ #include "auto_config_initializer.h" +#include "config_helpers.h" #include "config.h" #include "kikimr_services_initializers.h" #include "service_initializer.h" @@ -277,42 +278,6 @@ IKikimrServicesInitializer::IKikimrServicesInitializer(const TKikimrRunConfig& r // TBasicServicesInitializer -template -static TCpuMask ParseAffinity(const TConfig& cfg) { - TCpuMask result; - if (cfg.GetCpuList()) { - result = TCpuMask(cfg.GetCpuList()); - } else if (cfg.GetX().size() > 0) { - result = TCpuMask(cfg.GetX().data(), cfg.GetX().size()); - } else { // use all processors - TAffinity available; - available.Current(); - result = available; - } - if (cfg.GetExcludeCpuList()) { - result = result - TCpuMask(cfg.GetExcludeCpuList()); - } - return result; -} - -TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) { - return systemConfig.HasSelfPingInterval() - ? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval()) - : TDuration::MilliSeconds(10); -} - - -NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) { - switch (profile) { - case NKikimrConfig::TActorSystemConfig::DEFAULT: - return NActors::EASProfile::Default; - case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION: - return NActors::EASProfile::LowCpuConsumption; - case NKikimrConfig::TActorSystemConfig::LOW_LATENCY: - return NActors::EASProfile::LowLatency; - } -} - void AddExecutorPool( TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, @@ -321,55 +286,7 @@ void AddExecutorPool( const NKikimr::TAppData* appData) { const auto counters = GetServiceCounters(appData->Counters, "utils"); - switch (poolConfig.GetType()) { - case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: { - TBasicExecutorPoolConfig basic; - basic.PoolId = poolId; - basic.PoolName = poolConfig.GetName(); - if (poolConfig.HasMaxAvgPingDeviation()) { - auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName); - auto &poolInfo = cpuManager.PingInfoByPool[poolId]; - poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false); - poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false); - TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation()); - poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds(); - } - basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads()); - basic.SpinThreshold = poolConfig.GetSpinThreshold(); - basic.Affinity = ParseAffinity(poolConfig.GetAffinity()); - basic.RealtimePriority = poolConfig.GetRealtimePriority(); - basic.HasSharedThread = poolConfig.GetHasSharedThread(); - if (poolConfig.HasTimePerMailboxMicroSecs()) { - basic.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs()); - } else if (systemConfig.HasTimePerMailboxMicroSecs()) { - basic.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs()); - } - if (poolConfig.HasEventsPerMailbox()) { - basic.EventsPerMailbox = poolConfig.GetEventsPerMailbox(); - } else if (systemConfig.HasEventsPerMailbox()) { - basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox(); - } - basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile()); - Y_ABORT_UNLESS(basic.EventsPerMailbox != 0); - basic.MinThreadCount = poolConfig.GetMinThreads(); - basic.MaxThreadCount = poolConfig.GetMaxThreads(); - basic.DefaultThreadCount = poolConfig.GetThreads(); - basic.Priority = poolConfig.GetPriority(); - cpuManager.Basic.emplace_back(std::move(basic)); - break; - } - case NKikimrConfig::TActorSystemConfig::TExecutor::IO: { - TIOExecutorPoolConfig io; - io.PoolId = poolId; - io.PoolName = poolConfig.GetName(); - io.Threads = poolConfig.GetThreads(); - io.Affinity = ParseAffinity(poolConfig.GetAffinity()); - cpuManager.IO.emplace_back(std::move(io)); - break; - } - default: - Y_ABORT(); - } + NActorSystemConfigHelpers::AddExecutorPool(cpuManager, poolConfig, systemConfig, poolId, counters); } static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config, @@ -383,16 +300,6 @@ static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSyste return cpuManager; } -static TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler &config) { - const ui64 resolution = config.HasResolution() ? config.GetResolution() : 1024; - Y_DEBUG_ABORT_UNLESS((resolution & (resolution - 1)) == 0); // resolution must be power of 2 - const ui64 spinThreshold = config.HasSpinThreshold() ? config.GetSpinThreshold() : 0; - const ui64 progressThreshold = config.HasProgressThreshold() ? config.GetProgressThreshold() : 10000; - const bool useSchedulerActor = config.HasUseSchedulerActor() ? config.GetUseSchedulerActor() : false; - - return TSchedulerConfig(resolution, spinThreshold, progressThreshold, useSchedulerActor); -} - static bool IsServiceInitialized(NActors::TActorSystemSetup* setup, TActorId service) { for (auto &pr : setup->LocalServices) @@ -601,7 +508,7 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s setup->CpuManager = CreateCpuManagerConfig(systemConfig, appData); setup->MonitorStuckActors = systemConfig.GetMonitorStuckActors(); - auto schedulerConfig = CreateSchedulerConfig(systemConfig.GetScheduler()); + auto schedulerConfig = NActorSystemConfigHelpers::CreateSchedulerConfig(systemConfig.GetScheduler()); schedulerConfig.MonCounters = GetServiceCounters(counters, "utils"); setup->Scheduler.Reset(CreateSchedulerThread(schedulerConfig)); setup->LocalServices.emplace_back(MakeIoDispatcherActorId(), TActorSetupCmd(CreateIoDispatcherActor( @@ -1265,7 +1172,7 @@ void TSchedulerActorInitializer::InitializeServices( NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { auto& systemConfig = Config.GetActorSystemConfig(); - NActors::IActor *schedulerActor = CreateSchedulerActor(CreateSchedulerConfig(systemConfig.GetScheduler())); + NActors::IActor *schedulerActor = CreateSchedulerActor(NActorSystemConfigHelpers::CreateSchedulerConfig(systemConfig.GetScheduler())); if (schedulerActor) { NActors::TActorSetupCmd schedulerActorCmd(schedulerActor, NActors::TMailboxType::ReadAsFilled, appData->SystemPoolId); setup->LocalServices.emplace_back(MakeSchedulerActorId(), std::move(schedulerActorCmd)); diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index b05dc136cd5e..b83ac0465eb7 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -4,6 +4,7 @@ SRCS( auto_config_initializer.cpp config.cpp config.h + config_helpers.cpp config_parser.cpp config_parser.h driver.h diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index 6fc7cfbaabda..23b00291bc4c 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -49,6 +50,10 @@ namespace NActors { NeedStatsCollectors = true; } + void TTestActorRuntime::SetupActorSystemConfig(const TActorSystemSetupConfig& config) { + ActorSystemSetupConfig = config; + } + TTestActorRuntime::TTestActorRuntime(THeSingleSystemEnv d) : TPortManager(false) , TTestActorRuntimeBase{d} @@ -219,6 +224,18 @@ namespace NActors { } void TTestActorRuntime::InitActorSystemSetup(TActorSystemSetup& setup, TNodeDataBase* node) { + if (ActorSystemSetupConfig) { + setup.Executors.Reset(); + setup.ExecutorsCount = 0; + + setup.CpuManager = ActorSystemSetupConfig->CpuManagerConfig; + setup.MonitorStuckActors = ActorSystemSetupConfig->MonitorStuckActors; + + auto schedulerConfig = ActorSystemSetupConfig->SchedulerConfig; + schedulerConfig.MonCounters = NKikimr::GetServiceCounters(node->DynamicCounters, "utils"); + setup.Scheduler.Reset(CreateSchedulerThread(schedulerConfig)); + } + if (NeedMonitoring && NeedStatsCollectors) { NActors::IActor* statsCollector = NKikimr::CreateStatsCollector(1, setup, node->DynamicCounters); setup.LocalServices.push_back({ diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h index 149180536710..c9ce31e7da3c 100644 --- a/ydb/core/testlib/actors/test_runtime.h +++ b/ydb/core/testlib/actors/test_runtime.h @@ -17,6 +17,10 @@ namespace NKikimr { struct TAppData; } +namespace NKikimrConfig { + class TActorSystemConfig; +} + namespace NKikimrProto { class TKeyConfig; } @@ -53,6 +57,12 @@ namespace NActors { std::vector> Icb; }; + struct TActorSystemSetupConfig { + TCpuManagerConfig CpuManagerConfig; + TSchedulerConfig SchedulerConfig; + bool MonitorStuckActors; + }; + TTestActorRuntime(THeSingleSystemEnv d); TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount); @@ -63,6 +73,7 @@ namespace NActors { void AddAppDataInit(std::function callback); virtual void Initialize(TEgg); void SetupStatsCollectors(); + void SetupActorSystemConfig(const TActorSystemSetupConfig& config); ui16 GetMonPort(ui32 nodeIndex = 0) const; @@ -125,5 +136,6 @@ namespace NActors { TActorId SleepEdgeActor; TVector> AppDataInit_; bool NeedStatsCollectors = false; + std::optional ActorSystemSetupConfig; }; } // namespace NActors diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 90d7b6d89657..a491e920f4a6 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include #include @@ -234,6 +236,8 @@ namespace Tests { Runtime->SetupMonitoring(Settings->MonitoringPortOffset, Settings->MonitoringTypeAsync); Runtime->SetLogBackend(Settings->LogBackend); + SetupActorSystemConfig(); + Runtime->AddAppDataInit([this](ui32 nodeIdx, NKikimr::TAppData& appData) { Y_UNUSED(nodeIdx); @@ -291,6 +295,28 @@ namespace Tests { SetupStorage(); } + void TServer::SetupActorSystemConfig() { + if (!Settings->AppConfig->HasActorSystemConfig()) { + return; + } + + auto actorSystemConfig = Settings->AppConfig->GetActorSystemConfig(); + if (actorSystemConfig.HasUseAutoConfig() && actorSystemConfig.GetUseAutoConfig()) { + NAutoConfigInitializer::ApplyAutoConfig(&actorSystemConfig); + } + + TCpuManagerConfig cpuManager; + for (int poolId = 0; poolId < actorSystemConfig.GetExecutor().size(); poolId++) { + NActorSystemConfigHelpers::AddExecutorPool(cpuManager, actorSystemConfig.GetExecutor(poolId), actorSystemConfig, poolId, nullptr); + } + + Runtime->SetupActorSystemConfig(TTestActorRuntime::TActorSystemSetupConfig{ + .CpuManagerConfig = cpuManager, + .SchedulerConfig = NActorSystemConfigHelpers::CreateSchedulerConfig(actorSystemConfig.GetScheduler()), + .MonitorStuckActors = actorSystemConfig.GetMonitorStuckActors() + }); + } + void TServer::SetupMessageBus(ui16 port) { if (port) { Bus = NBus::CreateMessageQueue(NBus::TBusQueueConfig()); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 0205ffc989c2..69de2759a9ba 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -265,6 +265,7 @@ namespace Tests { protected: void SetupStorage(); + void SetupActorSystemConfig(); void SetupMessageBus(ui16 port); void SetupDomains(TAppPrepare&); void CreateBootstrapTablets(); diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp index 88564e1a0ca5..9c7428822441 100644 --- a/ydb/library/actors/testlib/test_runtime.cpp +++ b/ydb/library/actors/testlib/test_runtime.cpp @@ -1710,6 +1710,12 @@ namespace NActors { setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); } + if (harmonizer) { + for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { + harmonizer->AddPool(setup->Executors[i].Get()); + } + } + InitActorSystemSetup(*setup, node); return setup; @@ -1718,13 +1724,6 @@ namespace NActors { THolder TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { auto setup = MakeActorSystemSetup(nodeIndex, node); - node->ExecutorPools.resize(setup->ExecutorsCount); - for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { - IExecutorPool* executor = setup->Executors[i].Get(); - node->ExecutorPools[i] = executor; - node->Harmonizer->AddPool(executor); - } - const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); for (const auto& cmd : node->LocalServices) { @@ -1786,7 +1785,10 @@ namespace NActors { setup->LocalServices.push_back(std::move(loggerActorPair)); } - return THolder(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); + auto actorSystem = THolder(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); + node->ExecutorPools = actorSystem->GetBasicExecutorPools(); + + return actorSystem; } TActorSystem* TTestActorRuntimeBase::SingleSys() const { diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 52769286de32..b2ccba7dae3f 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -22,6 +22,15 @@ #include +void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { + TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; + for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { + query.replace(position, variableTemplate.size(), variableValue); + position += variableValue.size(); + } +} + + struct TExecutionOptions { enum class EExecutionCase { GenericScript, @@ -32,6 +41,7 @@ struct TExecutionOptions { std::vector ScriptQueries; TString SchemeQuery; + bool UseTemplates = false; ui32 LoopCount = 1; TDuration LoopDelay; @@ -70,8 +80,13 @@ struct TExecutionOptions { } NKqpRun::TRequestOptions GetSchemeQueryOptions() const { + TString sql = SchemeQuery; + if (UseTemplates) { + ReplaceYqlTokenTemplate(sql); + } + return { - .Query = SchemeQuery, + .Query = sql, .Action = NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE, .TraceId = DefaultTraceId, .PoolId = "", @@ -79,10 +94,17 @@ struct TExecutionOptions { }; } - NKqpRun::TRequestOptions GetScriptQueryOptions(size_t index, TInstant startTime) const { + NKqpRun::TRequestOptions GetScriptQueryOptions(size_t index, size_t queryId, TInstant startTime) const { Y_ABORT_UNLESS(index < ScriptQueries.size()); + + TString sql = ScriptQueries[index]; + if (UseTemplates) { + ReplaceYqlTokenTemplate(sql); + ReplaceTemplate("QUERY_ID", ToString(queryId), sql); + } + return { - .Query = ScriptQueries[index], + .Query = sql, .Action = GetScriptQueryAction(index), .TraceId = TStringBuilder() << GetValue(index, TraceIds, DefaultTraceId) << "-" << startTime.ToString(), .PoolId = GetValue(index, PoolIds, TString()), @@ -98,6 +120,10 @@ struct TExecutionOptions { } return values[std::min(index, values.size() - 1)]; } + + static void ReplaceYqlTokenTemplate(TString& sql) { + ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, GetEnv(NKqpRun::YQL_TOKEN_VARIABLE), sql); + } }; @@ -134,7 +160,7 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp switch (executionCase) { case TExecutionOptions::EExecutionCase::GenericScript: - if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(id, startTime))) { + if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(id, queryId, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; } Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl; @@ -150,19 +176,19 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp break; case TExecutionOptions::EExecutionCase::GenericQuery: - if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(id, startTime))) { + if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(id, queryId, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; } break; case TExecutionOptions::EExecutionCase::YqlScript: - if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(id, startTime))) { + if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(id, queryId, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } break; case TExecutionOptions::EExecutionCase::AsyncQuery: - runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(id, startTime)); + runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(id, queryId, startTime)); break; } } @@ -260,14 +286,6 @@ class TMain : public TMainClassArgs { return TFileInput(file).ReadAll(); } - static void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { - TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; - for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { - query.replace(position, variableTemplate.size(), variableValue); - position += variableValue.size(); - } - } - static IOutputStream* GetDefaultOutput(const TString& file) { if (file == "-") { return &Cout; @@ -315,13 +333,15 @@ class TMain : public TMainClassArgs { .RequiredArgument("file") .Handler1([this](const NLastGetopt::TOptsParser* option) { ExecutionOptions.SchemeQuery = LoadFile(option->CurVal()); - ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, YqlToken, ExecutionOptions.SchemeQuery); }); options.AddLongOption('p', "script-query", "Script query to execute (typically DML query)") .RequiredArgument("file") .Handler1([this](const NLastGetopt::TOptsParser* option) { ExecutionOptions.ScriptQueries.emplace_back(LoadFile(option->CurVal())); }); + options.AddLongOption("templates", "Enable templates for -s and -p queries, such as ${YQL_TOKEN} and ${QUERY_ID}") + .NoArgument() + .SetFlag(&ExecutionOptions.UseTemplates); options.AddLongOption('t', "table", "File with input table (can be used by YT with -E flag), table@file") .RequiredArgument("table@file") From 8b2a3cffdafdef72754c2a2df1c5ccbb4ac1b94d Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Tue, 13 Aug 2024 07:51:13 +0000 Subject: [PATCH 2/7] Passed executor pool ids to app data --- ydb/core/testlib/actors/test_runtime.cpp | 5 +++-- ydb/core/testlib/actors/test_runtime.h | 11 ++++++++++- ydb/core/testlib/test_client.cpp | 8 +++++++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index 23b00291bc4c..9ced093244d1 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -50,8 +50,9 @@ namespace NActors { NeedStatsCollectors = true; } - void TTestActorRuntime::SetupActorSystemConfig(const TActorSystemSetupConfig& config) { + void TTestActorRuntime::SetupActorSystemConfig(const TActorSystemSetupConfig& config, const TActorSystemPools& pools) { ActorSystemSetupConfig = config; + ActorSystemPools = pools; } TTestActorRuntime::TTestActorRuntime(THeSingleSystemEnv d) @@ -136,7 +137,7 @@ namespace NActors { node->ActorSystem = MakeActorSystem(nodeIndex, node); node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor")); } else { - node->AppData0.reset(new NKikimr::TAppData(0, 1, 2, 3, { }, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr)); + node->AppData0.reset(new NKikimr::TAppData(ActorSystemPools.SystemPoolId, ActorSystemPools.UserPoolId, ActorSystemPools.IOPoolId, ActorSystemPools.BatchPoolId, ActorSystemPools.ServicePools, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr)); node->ActorSystem = MakeActorSystem(nodeIndex, node); } node->LogSettings->MessagePrefix = " node " + ToString(nodeId); diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h index c9ce31e7da3c..90b75e5c5730 100644 --- a/ydb/core/testlib/actors/test_runtime.h +++ b/ydb/core/testlib/actors/test_runtime.h @@ -63,6 +63,14 @@ namespace NActors { bool MonitorStuckActors; }; + struct TActorSystemPools { + ui32 SystemPoolId = 0; + ui32 UserPoolId = 1; + ui32 IOPoolId = 2; + ui32 BatchPoolId = 3; + TMap ServicePools = {}; + }; + TTestActorRuntime(THeSingleSystemEnv d); TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount); @@ -73,7 +81,7 @@ namespace NActors { void AddAppDataInit(std::function callback); virtual void Initialize(TEgg); void SetupStatsCollectors(); - void SetupActorSystemConfig(const TActorSystemSetupConfig& config); + void SetupActorSystemConfig(const TActorSystemSetupConfig& config, const TActorSystemPools& pools); ui16 GetMonPort(ui32 nodeIndex = 0) const; @@ -137,5 +145,6 @@ namespace NActors { TVector> AppDataInit_; bool NeedStatsCollectors = false; std::optional ActorSystemSetupConfig; + TActorSystemPools ActorSystemPools; }; } // namespace NActors diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index a491e920f4a6..87701bd16532 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -301,7 +301,8 @@ namespace Tests { } auto actorSystemConfig = Settings->AppConfig->GetActorSystemConfig(); - if (actorSystemConfig.HasUseAutoConfig() && actorSystemConfig.GetUseAutoConfig()) { + const bool useAutoConfig = actorSystemConfig.HasUseAutoConfig() && actorSystemConfig.GetUseAutoConfig(); + if (useAutoConfig) { NAutoConfigInitializer::ApplyAutoConfig(&actorSystemConfig); } @@ -310,10 +311,15 @@ namespace Tests { NActorSystemConfigHelpers::AddExecutorPool(cpuManager, actorSystemConfig.GetExecutor(poolId), actorSystemConfig, poolId, nullptr); } + const NAutoConfigInitializer::TASPools pools = NAutoConfigInitializer::GetASPools(actorSystemConfig, useAutoConfig); + Runtime->SetupActorSystemConfig(TTestActorRuntime::TActorSystemSetupConfig{ .CpuManagerConfig = cpuManager, .SchedulerConfig = NActorSystemConfigHelpers::CreateSchedulerConfig(actorSystemConfig.GetScheduler()), .MonitorStuckActors = actorSystemConfig.GetMonitorStuckActors() + }, TTestActorRuntime::TActorSystemPools{ + pools.SystemPoolId, pools.UserPoolId, pools.IOPoolId, pools.BatchPoolId, + NAutoConfigInitializer::GetServicePools(actorSystemConfig, useAutoConfig) }); } From 3a3b922ec36f3da44ca49294fcac308293fa22e3 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Tue, 13 Aug 2024 08:21:31 +0000 Subject: [PATCH 3/7] Fixed ExecutorPools initialization --- ydb/core/testlib/actors/test_runtime.h | 6 +---- ydb/library/actors/testlib/test_runtime.cpp | 27 ++++++++++++++------- ydb/library/actors/testlib/test_runtime.h | 2 +- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h index 90b75e5c5730..7198bc03b6d9 100644 --- a/ydb/core/testlib/actors/test_runtime.h +++ b/ydb/core/testlib/actors/test_runtime.h @@ -17,10 +17,6 @@ namespace NKikimr { struct TAppData; } -namespace NKikimrConfig { - class TActorSystemConfig; -} - namespace NKikimrProto { class TKeyConfig; } @@ -60,7 +56,7 @@ namespace NActors { struct TActorSystemSetupConfig { TCpuManagerConfig CpuManagerConfig; TSchedulerConfig SchedulerConfig; - bool MonitorStuckActors; + bool MonitorStuckActors = false; }; struct TActorSystemPools { diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp index 9c7428822441..240db79e7853 100644 --- a/ydb/library/actors/testlib/test_runtime.cpp +++ b/ydb/library/actors/testlib/test_runtime.cpp @@ -905,7 +905,7 @@ namespace NActors { TGuard guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { - Y_ABORT_UNLESS(poolId < node->ExecutorPools.size()); + Y_ABORT_UNLESS(node->ExecutorPools.contains(poolId)); return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); } @@ -973,7 +973,7 @@ namespace NActors { TGuard guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { - Y_ABORT_UNLESS(poolId < node->ExecutorPools.size()); + Y_ABORT_UNLESS(node->ExecutorPools.contains(poolId)); return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); } @@ -1710,12 +1710,6 @@ namespace NActors { setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); } - if (harmonizer) { - for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { - harmonizer->AddPool(setup->Executors[i].Get()); - } - } - InitActorSystemSetup(*setup, node); return setup; @@ -1724,6 +1718,13 @@ namespace NActors { THolder TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { auto setup = MakeActorSystemSetup(nodeIndex, node); + node->ExecutorPools.reserve(setup->ExecutorsCount); + for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { + IExecutorPool* executor = setup->Executors[i].Get(); + node->ExecutorPools[i] = executor; + node->Harmonizer->AddPool(executor); + } + const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); for (const auto& cmd : node->LocalServices) { @@ -1786,7 +1787,15 @@ namespace NActors { } auto actorSystem = THolder(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); - node->ExecutorPools = actorSystem->GetBasicExecutorPools(); + + if (node->ExecutorPools.empty()) { + // Initialize pools from actor system (except IO pool) + const auto& pools = actorSystem->GetBasicExecutorPools(); + node->ExecutorPools.reserve(pools.size()); + for (IExecutorPool* pool : pools) { + node->ExecutorPools[pool->PoolId] = pool; + } + } return actorSystem; } diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h index 587ebb49de75..f462efc45fd3 100644 --- a/ydb/library/actors/testlib/test_runtime.h +++ b/ydb/library/actors/testlib/test_runtime.h @@ -701,7 +701,7 @@ namespace NActors { std::shared_ptr AppData0; THolder ActorSystem; THolder SchedulerPool; - TVector ExecutorPools; + THashMap ExecutorPools; THolder ExecutorThread; std::unique_ptr Harmonizer; }; From 94f2ee70b335e6a913627a934237418ff671c20c Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Tue, 13 Aug 2024 08:46:16 +0000 Subject: [PATCH 4/7] Added explicit pool into kqprun --- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index cd5c89c8f6de..6a1f1cf7570d 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -199,7 +199,7 @@ class TYdbSetup::TImpl { void WaitResourcesPublishing() const { auto promise = NThreading::NewPromise(); - GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount)); + GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount), 0, GetRuntime()->GetAppData().SystemPoolId); try { promise.GetFuture().GetValue(Settings_.InitializationTimeout); @@ -247,7 +247,7 @@ class TYdbSetup::TImpl { TQueryResponse QueryRequest(const TRequestOptions& query, TProgressCallback progressCallback) const { auto request = GetQueryRequest(query); auto promise = NThreading::NewPromise(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback)); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback), 0, GetRuntime()->GetAppData().UserPoolId); return promise.GetFuture().GetValueSync(); } @@ -275,7 +275,7 @@ class TYdbSetup::TImpl { auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); NActors::IActor* fetchActor = NKikimr::NKqp::CreateGetScriptExecutionResultActor(edgeActor, Settings_.DomainName, executionId, resultSetId, 0, rowsLimit, sizeLimit, TInstant::Max()); - GetRuntime()->Register(fetchActor, nodeIndex); + GetRuntime()->Register(fetchActor, nodeIndex, GetRuntime()->GetAppData(nodeIndex).UserPoolId); return GetRuntime()->GrabEdgeEvent(edgeActor); } @@ -289,7 +289,7 @@ class TYdbSetup::TImpl { void QueryRequestAsync(const TRequestOptions& query) { if (!AsyncQueryRunnerActorId_) { - AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.AsyncQueriesSettings)); + AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.AsyncQueriesSettings), 0, GetRuntime()->GetAppData().UserPoolId); } auto request = GetQueryRequest(query); From c9ed6f618d3caa870fab7e8dd4850191123602f3 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Tue, 13 Aug 2024 09:26:40 +0000 Subject: [PATCH 5/7] Removed replace template function --- ydb/tests/tools/kqprun/kqprun.cpp | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index b2ccba7dae3f..7e60c645b928 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -22,15 +22,6 @@ #include -void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { - TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; - for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { - query.replace(position, variableTemplate.size(), variableValue); - position += variableValue.size(); - } -} - - struct TExecutionOptions { enum class EExecutionCase { GenericScript, @@ -100,7 +91,7 @@ struct TExecutionOptions { TString sql = ScriptQueries[index]; if (UseTemplates) { ReplaceYqlTokenTemplate(sql); - ReplaceTemplate("QUERY_ID", ToString(queryId), sql); + SubstGlobal(sql, "${QUERY_ID}", ToString(queryId)); } return { @@ -122,7 +113,7 @@ struct TExecutionOptions { } static void ReplaceYqlTokenTemplate(TString& sql) { - ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, GetEnv(NKqpRun::YQL_TOKEN_VARIABLE), sql); + SubstGlobal(sql, TStringBuilder() << "${" << NKqpRun::YQL_TOKEN_VARIABLE << "}", GetEnv(NKqpRun::YQL_TOKEN_VARIABLE)); } }; From c96b91039b82135528ef4389d09394227691dbca Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Tue, 13 Aug 2024 10:07:57 +0000 Subject: [PATCH 6/7] Added error for empty token --- ydb/tests/tools/kqprun/kqprun.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 7e60c645b928..454b003ba8bb 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -113,7 +113,12 @@ struct TExecutionOptions { } static void ReplaceYqlTokenTemplate(TString& sql) { - SubstGlobal(sql, TStringBuilder() << "${" << NKqpRun::YQL_TOKEN_VARIABLE << "}", GetEnv(NKqpRun::YQL_TOKEN_VARIABLE)); + const TString variableName = TStringBuilder() << "${" << NKqpRun::YQL_TOKEN_VARIABLE << "}"; + if (const TString& yqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE)) { + SubstGlobal(sql, variableName, yqlToken); + } else if (sql.Contains(variableName)) { + ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable\n"; + } } }; From f2d33b3ed0d9501f8c724755a38f98921d39ed1a Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Tue, 13 Aug 2024 13:56:30 +0000 Subject: [PATCH 7/7] Hide TServer::EnableGrpc on GrpcPort message --- ydb/core/testlib/test_client.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 87701bd16532..3349b4e1ce8a 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -340,7 +340,9 @@ namespace Tests { auto system(Runtime->GetAnyNodeActorSystem()); - Cerr << "TServer::EnableGrpc on GrpcPort " << options.Port << ", node " << system->NodeId << Endl; + if (Settings->Verbose) { + Cerr << "TServer::EnableGrpc on GrpcPort " << options.Port << ", node " << system->NodeId << Endl; + } const size_t proxyCount = Max(ui32{1}, Settings->AppConfig->GetGRpcConfig().GetGRpcProxyCount()); TVector grpcRequestProxies;