From 513e1169016ad14eb5f90f2e3f7ad43989d1e4ff Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 18 Feb 2025 20:21:05 +0000 Subject: [PATCH 1/7] Added as conf --- .../tools/fqrun/configuration/as_config.conf | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 ydb/tests/tools/fqrun/configuration/as_config.conf diff --git a/ydb/tests/tools/fqrun/configuration/as_config.conf b/ydb/tests/tools/fqrun/configuration/as_config.conf new file mode 100644 index 000000000000..811f806b8d1e --- /dev/null +++ b/ydb/tests/tools/fqrun/configuration/as_config.conf @@ -0,0 +1,45 @@ +ActorSystemConfig { + Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 10 + Name: "System" + } + Executor { + Type: BASIC + Threads: 6 + SpinThreshold: 1 + Name: "User" + } + Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 1 + Name: "Batch" + } + Executor { + Type: IO + Threads: 1 + Name: "IO" + } + Executor { + Type: BASIC + Threads: 2 + SpinThreshold: 10 + Name: "IC" + TimePerMailboxMicroSecs: 100 + } + Scheduler { + Resolution: 64 + SpinThreshold: 0 + ProgressThreshold: 10000 + } + SysExecutor: 0 + UserExecutor: 1 + IoExecutor: 3 + BatchExecutor: 2 + ServiceExecutor { + ServiceName: "Interconnect" + ExecutorId: 4 + } +} From 0e4e79ca5178589e5c7d0601b9196d4d50263387 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 20 Feb 2025 08:40:18 +0000 Subject: [PATCH 2/7] TMP --- .../dummy/yql_pq_file_topic_client.cpp | 4 +- ydb/tests/tools/fqrun/.gitignore | 2 + ydb/tests/tools/fqrun/README.md | 11 ++- .../tools/fqrun/configuration/as_config.conf | 89 ++++++++++--------- ydb/tests/tools/fqrun/flame_graph.sh | 25 ++++++ ydb/tests/tools/fqrun/fqrun.cpp | 40 +++++++++ ydb/tests/tools/fqrun/src/common.h | 4 + ydb/tests/tools/fqrun/src/fq_setup.cpp | 12 +++ ydb/tests/tools/fqrun/src/ya.make | 1 + ydb/tests/tools/fqrun/ya.make | 7 ++ ydb/tests/tools/kqprun/README.md | 2 +- ydb/tests/tools/kqprun/kqprun.cpp | 79 +--------------- ydb/tests/tools/kqprun/runlib/application.cpp | 63 ++++++++++++- ydb/tests/tools/kqprun/runlib/application.h | 16 ++++ ydb/tests/tools/kqprun/runlib/utils.cpp | 16 ++++ ydb/tests/tools/kqprun/runlib/utils.h | 11 ++- ydb/tests/tools/kqprun/runlib/ya.make | 3 + 17 files changed, 257 insertions(+), 128 deletions(-) create mode 100755 ydb/tests/tools/fqrun/flame_graph.sh diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp index 4be95da05904..751105aeb694 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp @@ -123,7 +123,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); size_t size = 0; ui64 maxBatchRowSize = 100; - while (size_t read = fi.ReadLine(rawMsg)) { + while (fi.ReadLine(rawMsg)) { msgs.emplace_back(MakeNextMessage(rawMsg)); MsgOffset_++; if (!maxBatchRowSize--) { @@ -133,6 +133,8 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); } if (!msgs.empty()) { EventsQ_.Push(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, Session_), size); + } else { + EventsQ_.Push(NYdb::NTopic::TSessionClosedEvent(NYdb::EStatus::CANCELLED, {NYdb::NIssue::TIssue("PQ file topic was finished")}), size); } Sleep(FILE_POLL_PERIOD); diff --git a/ydb/tests/tools/fqrun/.gitignore b/ydb/tests/tools/fqrun/.gitignore index 99dcb6bb049f..ee3ed5154ff0 100644 --- a/ydb/tests/tools/fqrun/.gitignore +++ b/ydb/tests/tools/fqrun/.gitignore @@ -5,3 +5,5 @@ sync_dir *.conf *.parquet *.json +*.svg +*.txt diff --git a/ydb/tests/tools/fqrun/README.md b/ydb/tests/tools/fqrun/README.md index cd133168dcd4..e1c7dd247546 100644 --- a/ydb/tests/tools/fqrun/README.md +++ b/ydb/tests/tools/fqrun/README.md @@ -2,6 +2,15 @@ Tool can be used to execute streaming queries by using FQ proxy infrastructure. +For profiling memory allocations build fqrun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS`. + +## Scripts + +* `flame_graph.sh` - script for collecting flame graphs in svg format, usage: + ```(bash) + ./flame_graph.sh [graph collection time in seconds] + ``` + ## Examples ### Queries @@ -25,7 +34,7 @@ Tool can be used to execute streaming queries by using FQ proxy infrastructure. ./fqrun -M 32000 ``` - Monitoring endpoint: https://localhost:32000 + Monitoring endpoint: http://localhost:32000 * gRPC endpoint: ```(bash) diff --git a/ydb/tests/tools/fqrun/configuration/as_config.conf b/ydb/tests/tools/fqrun/configuration/as_config.conf index 811f806b8d1e..c1f7d09e7f6a 100644 --- a/ydb/tests/tools/fqrun/configuration/as_config.conf +++ b/ydb/tests/tools/fqrun/configuration/as_config.conf @@ -1,45 +1,46 @@ -ActorSystemConfig { - Executor { - Type: BASIC - Threads: 1 - SpinThreshold: 10 - Name: "System" - } - Executor { - Type: BASIC - Threads: 6 - SpinThreshold: 1 - Name: "User" - } - Executor { - Type: BASIC - Threads: 1 - SpinThreshold: 1 - Name: "Batch" - } - Executor { - Type: IO - Threads: 1 - Name: "IO" - } - Executor { - Type: BASIC - Threads: 2 - SpinThreshold: 10 - Name: "IC" - TimePerMailboxMicroSecs: 100 - } - Scheduler { - Resolution: 64 - SpinThreshold: 0 - ProgressThreshold: 10000 - } - SysExecutor: 0 - UserExecutor: 1 - IoExecutor: 3 - BatchExecutor: 2 - ServiceExecutor { - ServiceName: "Interconnect" - ExecutorId: 4 - } +Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 10 + Name: "System" +} +Executor { + Type: BASIC + Threads: 6 + SpinThreshold: 1 + Name: "User" +} +Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 1 + Name: "Batch" +} +Executor { + Type: IO + Threads: 1 + Name: "IO" +} +Executor { + Type: BASIC + Threads: 2 + SpinThreshold: 10 + Name: "IC" + TimePerMailboxMicroSecs: 100 +} + +Scheduler { + Resolution: 64 + SpinThreshold: 0 + ProgressThreshold: 10000 +} + +SysExecutor: 0 +UserExecutor: 1 +IoExecutor: 3 +BatchExecutor: 2 + +ServiceExecutor { + ServiceName: "Interconnect" + ExecutorId: 4 } diff --git a/ydb/tests/tools/fqrun/flame_graph.sh b/ydb/tests/tools/fqrun/flame_graph.sh new file mode 100755 index 000000000000..dea766a02c75 --- /dev/null +++ b/ydb/tests/tools/fqrun/flame_graph.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +set -eux + +function cleanup { + rm ./profdata + rm ./profdata.txt +} +trap cleanup EXIT + +if [ $# -gt 1 ]; then + echo "Too many arguments" + exit -1 +fi + +fqrun_pid=$(pgrep -u $USER fqrun) + +sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $fqrun_pid -v -o profdata -- sleep ${1:-'30'} +sudo perf script -i profdata > profdata.txt + +SCRIPT_DIR=$(cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) + +flame_graph_tool="$SCRIPT_DIR/../../../../contrib/tools/flame-graph/" + +${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp index c5c0133fc4de..da6cb3ba6289 100644 --- a/ydb/tests/tools/fqrun/fqrun.cpp +++ b/ydb/tests/tools/fqrun/fqrun.cpp @@ -8,6 +8,10 @@ #include #include +#ifdef PROFILE_MEMORY_ALLOCATIONS +#include +#endif + using namespace NKikimrRun; namespace NFqRun { @@ -156,6 +160,21 @@ class TMain : public TMainBase { } }); + options.AddLongOption("as-cfg", "File with actor system config (TActorSystemConfig), use '-' for default") + .RequiredArgument("file") + .DefaultValue("./configuration/as_config.conf") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + const TString file(option->CurValOrDef()); + if (file == "-") { + return; + } + + RunnerOptions.FqSettings.ActorSystemConfig = NKikimrConfig::TActorSystemConfig(); + if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &(*RunnerOptions.FqSettings.ActorSystemConfig))) { + ythrow yexception() << "Bad format of actor system configuration"; + } + }); + options.AddLongOption("emulate-s3", "Enable readings by s3 provider from files, `bucket` value in connection - path to folder with files") .NoArgument() .SetFlag(&RunnerOptions.FqSettings.EmulateS3); @@ -210,6 +229,7 @@ class TMain : public TMainBase { ExecutionOptions.Validate(RunnerOptions); RunnerOptions.FqSettings.YqlToken = GetEnv(YQL_TOKEN_VARIABLE); + RunnerOptions.FqSettings.FunctionRegistry = CreateFunctionRegistry().Get(); auto& gatewayConfig = *RunnerOptions.FqSettings.FqConfig.mutable_gateways(); FillTokens(gatewayConfig.mutable_pq()); @@ -230,8 +250,26 @@ class TMain : public TMainBase { RunnerOptions.FqSettings.PqGateway = std::move(fileGateway); } +#ifdef PROFILE_MEMORY_ALLOCATIONS + if (RunnerOptions.FqSettings.VerboseLevel >= 1) { + Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl; + } + NAllocProfiler::StartAllocationSampling(true); +#else + if (ProfileAllocationsOutput) { + ythrow yexception() << "Profile memory allocations disabled, please rebuild fqrun with flag `-D PROFILE_MEMORY_ALLOCATIONS`"; + } +#endif + RunScript(ExecutionOptions, RunnerOptions); +#ifdef PROFILE_MEMORY_ALLOCATIONS + if (RunnerOptions.FqSettings.VerboseLevel >= 1) { + Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl; + } + FinishProfileMemoryAllocations(); +#endif + return 0; } @@ -246,6 +284,8 @@ class TMain : public TMainBase { } private: + inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); + TExecutionOptions ExecutionOptions; TRunnerOptions RunnerOptions; std::unordered_map PqFilesMapping; diff --git a/ydb/tests/tools/fqrun/src/common.h b/ydb/tests/tools/fqrun/src/common.h index 53dfbe654e93..457897781891 100644 --- a/ydb/tests/tools/fqrun/src/common.h +++ b/ydb/tests/tools/fqrun/src/common.h @@ -7,6 +7,8 @@ #include #include +#include + namespace NFqRun { constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; @@ -27,8 +29,10 @@ struct TFqSetupSettings : public NKikimrRun::TServerSettings { TString YqlToken; NYql::IPqGateway::TPtr PqGateway; + TIntrusivePtr FunctionRegistry; NFq::NConfig::TConfig FqConfig; NKikimrConfig::TLogConfig LogConfig; + std::optional ActorSystemConfig; }; struct TRunnerOptions { diff --git a/ydb/tests/tools/fqrun/src/fq_setup.cpp b/ydb/tests/tools/fqrun/src/fq_setup.cpp index 394bfc5c3e16..a4fbfd104777 100644 --- a/ydb/tests/tools/fqrun/src/fq_setup.cpp +++ b/ydb/tests/tools/fqrun/src/fq_setup.cpp @@ -44,6 +44,14 @@ class TFqSetup::TImpl { serverSettings.SetLoggerInitializer(loggerInitializer); } + void SetFunctionRegistry(NKikimr::Tests::TServerSettings& serverSettings) const { + if (Settings.FunctionRegistry) { + serverSettings.SetFrFactory([this](const NKikimr::NScheme::TTypeRegistry&) { + return Settings.FunctionRegistry.Get(); + }); + } + } + NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) { NKikimr::Tests::TServerSettings serverSettings(PortManager.GetPort()); @@ -52,9 +60,13 @@ class TFqSetup::TImpl { NKikimrConfig::TAppConfig config; *config.MutableLogConfig() = Settings.LogConfig; + if (Settings.ActorSystemConfig) { + *config.MutableActorSystemConfig() = *Settings.ActorSystemConfig; + } serverSettings.SetAppConfig(config); SetLoggerSettings(serverSettings); + SetFunctionRegistry(serverSettings); if (Settings.MonitoringEnabled) { serverSettings.InitKikimrRunConfig(); diff --git a/ydb/tests/tools/fqrun/src/ya.make b/ydb/tests/tools/fqrun/src/ya.make index 6f44cf16c38d..6c0e2827e5cd 100644 --- a/ydb/tests/tools/fqrun/src/ya.make +++ b/ydb/tests/tools/fqrun/src/ya.make @@ -20,6 +20,7 @@ PEERDIR( ydb/library/security ydb/library/yql/providers/pq/provider ydb/tests/tools/kqprun/runlib + yql/essentials/minikql ) YQL_LAST_ABI_VERSION() diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make index 6553c9748b08..71922aff7950 100644 --- a/ydb/tests/tools/fqrun/ya.make +++ b/ydb/tests/tools/fqrun/ya.make @@ -1,5 +1,11 @@ PROGRAM(fqrun) +IF (PROFILE_MEMORY_ALLOCATIONS) + MESSAGE("Enabled profile memory allocations") + ALLOCATOR(LF_DBG) + CFLAGS(-D PROFILE_MEMORY_ALLOCATIONS) +ENDIF() + SRCS( fqrun.cpp ) @@ -7,6 +13,7 @@ SRCS( PEERDIR( library/cpp/colorizer library/cpp/getopt + library/cpp/lfalloc/alloc_profiler util ydb/library/yql/providers/pq/gateway/dummy ydb/tests/tools/fqrun/src diff --git a/ydb/tests/tools/kqprun/README.md b/ydb/tests/tools/kqprun/README.md index e5d82a018c56..afb207a2fed9 100644 --- a/ydb/tests/tools/kqprun/README.md +++ b/ydb/tests/tools/kqprun/README.md @@ -42,7 +42,7 @@ For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMO ./kqprun -M 32000 ``` - Monitoring endpoint: https://localhost:32000 + Monitoring endpoint: http://localhost:32000 * gRPC endpoint: ```(bash) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index e860615e9fd7..9eb8199ac28b 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -14,9 +14,6 @@ #include #include -#include -#include - #include #include #include @@ -402,35 +399,10 @@ void RunScript(const TExecutionOptions& executionOptions, const TRunnerOptions& } -TIntrusivePtr CreateFunctionRegistry(const TString& udfsDirectory, TVector udfsPaths, bool excludeLinkedUdfs) { - if (!udfsDirectory.empty() || !udfsPaths.empty()) { - NColorizer::TColors colors = NColorizer::AutoColors(Cout); - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching udfs..." << colors.Default() << Endl; - } - - NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths); - auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone(); - - if (excludeLinkedUdfs) { - for (const auto& wrapper : NYql::NUdf::GetStaticUdfModuleWrapperList()) { - auto [name, ptr] = wrapper(); - if (!functionRegistry->IsLoadedUdfModule(name)) { - functionRegistry->AddModule(TString(NKikimr::NMiniKQL::StaticModulePrefix) + name, name, std::move(ptr)); - } - } - } else { - NKikimr::NMiniKQL::FillStaticModules(*functionRegistry); - } - - return functionRegistry; -} - - class TMain : public TMainBase { using EVerbose = TYdbSetupSettings::EVerbose; inline static const TString YqlToken = GetEnv(YQL_TOKEN_VARIABLE); - inline static IOutputStream* ProfileAllocationsOutput = nullptr; inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); TExecutionOptions ExecutionOptions; @@ -438,26 +410,8 @@ class TMain : public TMainBase { std::unordered_map Templates; THashMap TablesMapping; - TVector UdfsPaths; - TString UdfsDirectory; - bool ExcludeLinkedUdfs = false; bool EmulateYt = false; -#ifdef PROFILE_MEMORY_ALLOCATIONS -public: - static void FinishProfileMemoryAllocations() { - if (ProfileAllocationsOutput) { - NAllocProfiler::StopAllocationSampling(*ProfileAllocationsOutput); - } else { - TString output; - TStringOutput stream(output); - NAllocProfiler::StopAllocationSampling(stream); - - Cout << CoutColors.Red() << "Warning: profile memory allocations output is not specified, please use flag `--profile-output` for writing profile info (dump size " << NKikimr::NBlobDepot::FormatByteSize(output.size()) << ")" << CoutColors.Default() << Endl; - } - } -#endif - protected: void RegisterOptions(NLastGetopt::TOpts& options) override { options.SetTitle("KqpRun -- tool to execute queries by using kikimr provider (instead of dq provider in DQrun tool)"); @@ -538,18 +492,6 @@ class TMain : public TMainBase { } }); - options.AddLongOption('u', "udf", "Load shared library with UDF by given path") - .RequiredArgument("file") - .EmplaceTo(&UdfsPaths); - - options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") - .RequiredArgument("directory") - .StoreResult(&UdfsDirectory); - - options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") - .NoArgument() - .SetFlag(&ExcludeLinkedUdfs); - // Outputs TChoices traceOpt({ @@ -644,10 +586,6 @@ class TMain : public TMainBase { RunnerOptions.ScriptQueryTimelineFiles.emplace_back(file); }); - options.AddLongOption("profile-output", "File with profile memory allocations output (use '-' to write in stdout)") - .RequiredArgument("file") - .StoreMappedResultT(&ProfileAllocationsOutput, &GetDefaultOutput); - // Pipeline settings TChoices executionCase({ @@ -850,7 +788,7 @@ class TMain : public TMainBase { } RunnerOptions.YdbSettings.YqlToken = YqlToken; - RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get(); + RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry().Get(); auto& appConfig = RunnerOptions.YdbSettings.AppConfig; if (ExecutionOptions.ResultsRowsLimit) { @@ -908,17 +846,6 @@ class TMain : public TMainBase { } }; -#ifdef PROFILE_MEMORY_ALLOCATIONS -void InterruptHandler(int) { - NColorizer::TColors colors = NColorizer::AutoColors(Cerr); - - Cout << colors.Red() << "Execution interrupted, finishing profile memory allocations..." << colors.Default() << Endl; - TMain::FinishProfileMemoryAllocations(); - - abort(); -} -#endif - } // anonymous namespace } // namespace NKqpRun @@ -926,10 +853,6 @@ void InterruptHandler(int) { int main(int argc, const char* argv[]) { SetupSignalActions(); -#ifdef PROFILE_MEMORY_ALLOCATIONS - signal(SIGINT, &NKqpRun::InterruptHandler); -#endif - try { NKqpRun::TMain().Run(argc, argv); } catch (...) { diff --git a/ydb/tests/tools/kqprun/runlib/application.cpp b/ydb/tests/tools/kqprun/runlib/application.cpp index 9a35d584f19e..5c6b55c0a95e 100644 --- a/ydb/tests/tools/kqprun/runlib/application.cpp +++ b/ydb/tests/tools/kqprun/runlib/application.cpp @@ -1,15 +1,49 @@ #include "application.h" #include "utils.h" +#include #include #include #include +#include +#include + +#ifdef PROFILE_MEMORY_ALLOCATIONS +#include +#endif + namespace NKikimrRun { +#ifdef PROFILE_MEMORY_ALLOCATIONS +void TMainBase::FinishProfileMemoryAllocations() { + if (ProfileAllocationsOutput) { + NAllocProfiler::StopAllocationSampling(*ProfileAllocationsOutput); + } else { + TString output; + TStringOutput stream(output); + NAllocProfiler::StopAllocationSampling(stream); + + Cout << CoutColors.Red() << "Warning: profile memory allocations output is not specified, please use flag `--profile-output` for writing profile info (dump size " << NKikimr::NBlobDepot::FormatByteSize(output.size()) << ")" << CoutColors.Default() << Endl; + } +} +#endif + void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettings& settings) { + options.AddLongOption('u', "udf", "Load shared library with UDF by given path") + .RequiredArgument("file") + .EmplaceTo(&UdfsPaths); + + options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") + .RequiredArgument("directory") + .StoreResult(&UdfsDirectory); + + options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") + .NoArgument() + .SetFlag(&ExcludeLinkedUdfs); + options.AddLongOption("log-file", "File with execution logs (writes in stderr if empty)") .RequiredArgument("file") .StoreResult(&settings.LogOutputFile) @@ -32,7 +66,6 @@ void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettin }); options.AddLongOption("log-default", "Default log priority") .RequiredArgument("priority") - .Choices(logPriority.GetChoices()) .StoreMappedResultT(&DefaultLogPriority, logPriority); options.AddLongOption("log", "Component log priority in format = (e. g. KQP_YQL=trace)") @@ -55,6 +88,10 @@ void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettin } }); + options.AddLongOption("profile-output", "File with profile memory allocations output (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&ProfileAllocationsOutput, &GetDefaultOutput); + options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used will be run as daemon") .RequiredArgument("uint") .Handler1([&settings](const NLastGetopt::TOptsParser* option) { @@ -110,4 +147,28 @@ IOutputStream* TMainBase::GetDefaultOutput(const TString& file) { return nullptr; } +TIntrusivePtr TMainBase::CreateFunctionRegistry() const { + if (!UdfsDirectory.empty() || !UdfsPaths.empty()) { + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching udfs..." << colors.Default() << Endl; + } + + auto paths = UdfsPaths; + NKikimr::NMiniKQL::FindUdfsInDir(UdfsDirectory, &paths); + auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, paths)->Clone(); + + if (ExcludeLinkedUdfs) { + for (const auto& wrapper : NYql::NUdf::GetStaticUdfModuleWrapperList()) { + auto [name, ptr] = wrapper(); + if (!functionRegistry->IsLoadedUdfModule(name)) { + functionRegistry->AddModule(TString(NKikimr::NMiniKQL::StaticModulePrefix) + name, name, std::move(ptr)); + } + } + } else { + NKikimr::NMiniKQL::FillStaticModules(*functionRegistry); + } + + return functionRegistry; +} + } // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/application.h b/ydb/tests/tools/kqprun/runlib/application.h index 88eb2e289011..6dfc712df821 100644 --- a/ydb/tests/tools/kqprun/runlib/application.h +++ b/ydb/tests/tools/kqprun/runlib/application.h @@ -10,9 +10,16 @@ #include #include +#include + namespace NKikimrRun { class TMainBase : public TMainClassArgs { +#ifdef PROFILE_MEMORY_ALLOCATIONS +public: + static void FinishProfileMemoryAllocations(); +#endif + protected: void RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettings& settings); @@ -20,11 +27,20 @@ class TMainBase : public TMainClassArgs { static IOutputStream* GetDefaultOutput(const TString& file); + TIntrusivePtr CreateFunctionRegistry() const; + +protected: + inline static IOutputStream* ProfileAllocationsOutput = nullptr; + private: inline static std::vector> FileHolders; std::optional DefaultLogPriority; std::unordered_map LogPriorities; + + TString UdfsDirectory; + TVector UdfsPaths; + bool ExcludeLinkedUdfs; }; } // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/utils.cpp b/ydb/tests/tools/kqprun/runlib/utils.cpp index 208cf4daf045..4ac134821829 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.cpp +++ b/ydb/tests/tools/kqprun/runlib/utils.cpp @@ -1,4 +1,5 @@ #include "utils.h" +#include "application.h" #include #include @@ -49,6 +50,17 @@ void FloatingPointExceptionHandler(int) { abort(); } +#ifdef PROFILE_MEMORY_ALLOCATIONS +void InterruptHandler(int) { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cout << colors.Red() << "Execution interrupted, finishing profile memory allocations..." << colors.Default() << Endl; + TMainBase::FinishProfileMemoryAllocations(); + + abort(); +} +#endif + } // nonymous namespace @@ -237,6 +249,10 @@ void SetupSignalActions() { std::set_terminate(&TerminateHandler); signal(SIGSEGV, &SegmentationFaultHandler); signal(SIGFPE, &FloatingPointExceptionHandler); + +#ifdef PROFILE_MEMORY_ALLOCATIONS + signal(SIGINT, &NKqpRun::InterruptHandler); +#endif } void PrintResultSet(EResultOutputFormat format, IOutputStream& output, const Ydb::ResultSet& resultSet) { diff --git a/ydb/tests/tools/kqprun/runlib/utils.h b/ydb/tests/tools/kqprun/runlib/utils.h index 9e4d69c14501..13de2437165d 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.h +++ b/ydb/tests/tools/kqprun/runlib/utils.h @@ -34,12 +34,18 @@ struct TRequestResult { template class TChoices { public: - explicit TChoices(std::map choicesMap) + explicit TChoices(std::map choicesMap, const TString& optionName = "") : ChoicesMap(std::move(choicesMap)) + , OptionName(optionName) {} TResult operator()(const TString& choice) const { - return ChoicesMap.at(choice); + const auto it = ChoicesMap.find(choice); + // if (it == ChoicesMap.end()) { + + // throw yexception() << "Value '" << choice << "' is not allowed " << (OptionName ? TStringBuilder() << "for option " << OptionName : TStringBuilder()) << ", available variants:\n" << Join(", ") + // } + return it->second; } TVector GetChoices() const { @@ -57,6 +63,7 @@ class TChoices { private: const std::map ChoicesMap; + const TString OptionName; }; class TStatsPrinter { diff --git a/ydb/tests/tools/kqprun/runlib/ya.make b/ydb/tests/tools/kqprun/runlib/ya.make index 352a3a423105..745ffcdee03f 100644 --- a/ydb/tests/tools/kqprun/runlib/ya.make +++ b/ydb/tests/tools/kqprun/runlib/ya.make @@ -20,7 +20,10 @@ PEERDIR( ydb/public/api/protos ydb/public/lib/json_value ydb/public/lib/ydb_cli/common + yql/essentials/minikql + yql/essentials/minikql/invoke_builtins yql/essentials/public/issue + yql/essentials/public/udf ) YQL_LAST_ABI_VERSION() From f6a6332a20ee47739805dcb3b2861ab7c6626dc6 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 20 Feb 2025 10:06:37 +0000 Subject: [PATCH 3/7] TMP2 --- .../pq/gateway/dummy/yql_pq_dummy_gateway.h | 1 + .../dummy/yql_pq_file_topic_client.cpp | 13 ++++- .../gateway/dummy/yql_pq_file_topic_client.h | 3 +- ydb/tests/tools/fqrun/fqrun.cpp | 21 +++++-- ydb/tests/tools/fqrun/ya.make | 1 + ydb/tests/tools/kqprun/kqprun.cpp | 5 +- ydb/tests/tools/kqprun/runlib/application.cpp | 58 ++++++++----------- ydb/tests/tools/kqprun/runlib/application.h | 4 ++ ydb/tests/tools/kqprun/runlib/utils.cpp | 16 ++++- ydb/tests/tools/kqprun/runlib/utils.h | 26 +++++++-- 10 files changed, 96 insertions(+), 52 deletions(-) diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h index a39fc4f3882e..fef31d7dc09b 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h @@ -26,6 +26,7 @@ struct TDummyTopic { TString TopicName; TMaybe Path; size_t PartitionsCount; + bool CancelOnFileFinish = false; }; // Dummy Pq gateway for tests. diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp index 751105aeb694..d6190f14ac99 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp @@ -16,7 +16,7 @@ class TFileTopicReadSession : public NYdb::NTopic::IReadSession { constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); public: - TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "") + TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "", bool cancelOnFileFinish = false) : File_(std::move(file)) , Session_(std::move(session)) , ProducerId_(producerId) @@ -24,6 +24,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); PollFileForChanges(); }) , Counters_() + , CancelOnFileFinish_(cancelOnFileFinish) { Pool_.Start(1); } @@ -133,7 +134,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); } if (!msgs.empty()) { EventsQ_.Push(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, Session_), size); - } else { + } else if (CancelOnFileFinish_) { EventsQ_.Push(NYdb::NTopic::TSessionClosedEvent(NYdb::EStatus::CANCELLED, {NYdb::NIssue::TIssue("PQ file topic was finished")}), size); } @@ -147,6 +148,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); TString ProducerId_; std::thread FilePoller_; NYdb::NTopic::TReaderCounters::TPtr Counters_; + bool CancelOnFileFinish_ = false; TThreadPool Pool_; size_t MsgOffset_ = 0; @@ -338,6 +340,10 @@ struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession { } }; +TFileTopicClient::TFileTopicClient(THashMap topics) + : Topics_(topics) +{} + std::shared_ptr TFileTopicClient::CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) { Y_ENSURE(!settings.Topics_.empty()); const auto& topic = settings.Topics_.front(); @@ -360,7 +366,8 @@ std::shared_ptr TFileTopicClient::CreateReadSession( ui64 sessionId = 0; return std::make_shared( TFile(*filePath, EOpenMode::TEnum::RdOnly), - MakeIntrusive(sessionId, TString{topicPath}, partitionId) + MakeIntrusive(sessionId, TString{topicPath}, partitionId), + "", topicsIt->second.CancelOnFileFinish ); } diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h index f80426726159..246692d76029 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h @@ -4,7 +4,7 @@ namespace NYql { struct TFileTopicClient : public ITopicClient { - TFileTopicClient(THashMap topics): Topics_(topics) {} + explicit TFileTopicClient(THashMap topics); NYdb::TAsyncStatus CreateTopic(const TString& path, const NYdb::NTopic::TCreateTopicSettings& settings = {}) override; @@ -32,5 +32,6 @@ struct TFileTopicClient : public ITopicClient { private: THashMap Topics_; + bool CancelOnFileFinish_ = false; }; } \ No newline at end of file diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp index da6cb3ba6289..33ec74693730 100644 --- a/ydb/tests/tools/fqrun/fqrun.cpp +++ b/ydb/tests/tools/fqrun/fqrun.cpp @@ -3,6 +3,7 @@ #include +#include #include #include #include @@ -195,6 +196,20 @@ class TMain : public TMainBase { } }); + options.AddLongOption("cnacel-on-file-finish", "Cancel emulate YDS topics when topic file finished") + .RequiredArgument("topic") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + TStringBuf topicName; + TStringBuf filePath; + TStringBuf(option->CurVal()).Split('@', topicName, filePath); + if (topicName.empty() || filePath.empty()) { + ythrow yexception() << "Incorrect PQ file mapping, expected form topic@file"; + } + if (!PqFilesMapping.emplace(topicName, filePath).second) { + ythrow yexception() << "Got duplicated topic name: " << topicName; + } + }); + // Outputs options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)") @@ -251,7 +266,7 @@ class TMain : public TMainBase { } #ifdef PROFILE_MEMORY_ALLOCATIONS - if (RunnerOptions.FqSettings.VerboseLevel >= 1) { + if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl; } NAllocProfiler::StartAllocationSampling(true); @@ -264,7 +279,7 @@ class TMain : public TMainBase { RunScript(ExecutionOptions, RunnerOptions); #ifdef PROFILE_MEMORY_ALLOCATIONS - if (RunnerOptions.FqSettings.VerboseLevel >= 1) { + if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl; } FinishProfileMemoryAllocations(); @@ -284,8 +299,6 @@ class TMain : public TMainBase { } private: - inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); - TExecutionOptions ExecutionOptions; TRunnerOptions RunnerOptions; std::unordered_map PqFilesMapping; diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make index 71922aff7950..d1e6db1f21b1 100644 --- a/ydb/tests/tools/fqrun/ya.make +++ b/ydb/tests/tools/fqrun/ya.make @@ -15,6 +15,7 @@ PEERDIR( library/cpp/getopt library/cpp/lfalloc/alloc_profiler util + ydb/core/blob_depot ydb/library/yql/providers/pq/gateway/dummy ydb/tests/tools/fqrun/src ydb/tests/tools/kqprun/runlib diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 9eb8199ac28b..3ec48c7d5b6e 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -403,7 +403,6 @@ class TMain : public TMainBase { using EVerbose = TYdbSetupSettings::EVerbose; inline static const TString YqlToken = GetEnv(YQL_TOKEN_VARIABLE); - inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); TExecutionOptions ExecutionOptions; TRunnerOptions RunnerOptions; @@ -808,7 +807,7 @@ class TMain : public TMainBase { } #ifdef PROFILE_MEMORY_ALLOCATIONS - if (RunnerOptions.YdbSettings.VerboseLevel >= 1) { + if (RunnerOptions.YdbSettings.VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl; } NAllocProfiler::StartAllocationSampling(true); @@ -821,7 +820,7 @@ class TMain : public TMainBase { RunScript(ExecutionOptions, RunnerOptions); #ifdef PROFILE_MEMORY_ALLOCATIONS - if (RunnerOptions.YdbSettings.VerboseLevel >= 1) { + if (RunnerOptions.YdbSettings.VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl; } FinishProfileMemoryAllocations(); diff --git a/ydb/tests/tools/kqprun/runlib/application.cpp b/ydb/tests/tools/kqprun/runlib/application.cpp index 5c6b55c0a95e..6b27760b5b51 100644 --- a/ydb/tests/tools/kqprun/runlib/application.cpp +++ b/ydb/tests/tools/kqprun/runlib/application.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -53,40 +54,7 @@ void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettin } }); - TChoices logPriority({ - {"emerg", NActors::NLog::EPriority::PRI_EMERG}, - {"alert", NActors::NLog::EPriority::PRI_ALERT}, - {"crit", NActors::NLog::EPriority::PRI_CRIT}, - {"error", NActors::NLog::EPriority::PRI_ERROR}, - {"warn", NActors::NLog::EPriority::PRI_WARN}, - {"notice", NActors::NLog::EPriority::PRI_NOTICE}, - {"info", NActors::NLog::EPriority::PRI_INFO}, - {"debug", NActors::NLog::EPriority::PRI_DEBUG}, - {"trace", NActors::NLog::EPriority::PRI_TRACE}, - }); - options.AddLongOption("log-default", "Default log priority") - .RequiredArgument("priority") - .StoreMappedResultT(&DefaultLogPriority, logPriority); - - options.AddLongOption("log", "Component log priority in format = (e. g. KQP_YQL=trace)") - .RequiredArgument("component priority") - .Handler1([this, logPriority](const NLastGetopt::TOptsParser* option) { - TStringBuf component; - TStringBuf priority; - TStringBuf(option->CurVal()).Split('=', component, priority); - if (component.empty() || priority.empty()) { - ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace"; - } - - if (!logPriority.Contains(TString(priority))) { - ythrow yexception() << "Incorrect log priority: " << priority; - } - - const auto service = GetLogService(TString(component)); - if (!LogPriorities.emplace(service, logPriority(TString(priority))).second) { - ythrow yexception() << "Got duplicated log service name: " << component; - } - }); + RegisterLogOptions(options); options.AddLongOption("profile-output", "File with profile memory allocations output (use '-' to write in stdout)") .RequiredArgument("file") @@ -129,6 +97,28 @@ void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettin }); } +void TMainBase::RegisterLogOptions(NLastGetopt::TOpts& options) { + options.AddLongOption("log-default", "Default log priority") + .RequiredArgument("priority") + .StoreMappedResultT(&DefaultLogPriority, GetLogPrioritiesMap("log-default")); + + options.AddLongOption("log", "Component log priority in format = (e. g. KQP_YQL=trace)") + .RequiredArgument("component priority") + .Handler1([this, logPriority = GetLogPrioritiesMap("log")](const NLastGetopt::TOptsParser* option) { + TStringBuf component; + TStringBuf priority; + TStringBuf(option->CurVal()).Split('=', component, priority); + if (component.empty() || priority.empty()) { + ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace"; + } + + const auto service = GetLogService(TString(component)); + if (!LogPriorities.emplace(service, logPriority(TString(priority))).second) { + ythrow yexception() << "Got duplicated log service name: " << component; + } + }); +} + void TMainBase::FillLogConfig(NKikimrConfig::TLogConfig& config) const { if (DefaultLogPriority) { config.SetDefaultLevel(*DefaultLogPriority); diff --git a/ydb/tests/tools/kqprun/runlib/application.h b/ydb/tests/tools/kqprun/runlib/application.h index 6dfc712df821..fd67eb2ac0d9 100644 --- a/ydb/tests/tools/kqprun/runlib/application.h +++ b/ydb/tests/tools/kqprun/runlib/application.h @@ -2,6 +2,7 @@ #include "settings.h" +#include #include #include @@ -23,6 +24,8 @@ class TMainBase : public TMainClassArgs { protected: void RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettings& settings); + virtual void RegisterLogOptions(NLastGetopt::TOpts& options); + void FillLogConfig(NKikimrConfig::TLogConfig& config) const; static IOutputStream* GetDefaultOutput(const TString& file); @@ -30,6 +33,7 @@ class TMainBase : public TMainClassArgs { TIntrusivePtr CreateFunctionRegistry() const; protected: + inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); inline static IOutputStream* ProfileAllocationsOutput = nullptr; private: diff --git a/ydb/tests/tools/kqprun/runlib/utils.cpp b/ydb/tests/tools/kqprun/runlib/utils.cpp index 4ac134821829..f8c37d7b67fe 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.cpp +++ b/ydb/tests/tools/kqprun/runlib/utils.cpp @@ -245,13 +245,27 @@ void InitLogSettings(const NKikimrConfig::TLogConfig& logConfig, NActors::TTestA } } +TChoices GetLogPrioritiesMap(const TString& optionName) { + return TChoices({ + {"emerg", NActors::NLog::EPriority::PRI_EMERG}, + {"alert", NActors::NLog::EPriority::PRI_ALERT}, + {"crit", NActors::NLog::EPriority::PRI_CRIT}, + {"error", NActors::NLog::EPriority::PRI_ERROR}, + {"warn", NActors::NLog::EPriority::PRI_WARN}, + {"notice", NActors::NLog::EPriority::PRI_NOTICE}, + {"info", NActors::NLog::EPriority::PRI_INFO}, + {"debug", NActors::NLog::EPriority::PRI_DEBUG}, + {"trace", NActors::NLog::EPriority::PRI_TRACE}, + }, optionName, false); +} + void SetupSignalActions() { std::set_terminate(&TerminateHandler); signal(SIGSEGV, &SegmentationFaultHandler); signal(SIGFPE, &FloatingPointExceptionHandler); #ifdef PROFILE_MEMORY_ALLOCATIONS - signal(SIGINT, &NKqpRun::InterruptHandler); + signal(SIGINT, &InterruptHandler); #endif } diff --git a/ydb/tests/tools/kqprun/runlib/utils.h b/ydb/tests/tools/kqprun/runlib/utils.h index 13de2437165d..da6868d2d971 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.h +++ b/ydb/tests/tools/kqprun/runlib/utils.h @@ -34,17 +34,28 @@ struct TRequestResult { template class TChoices { public: - explicit TChoices(std::map choicesMap, const TString& optionName = "") + explicit TChoices(std::map choicesMap, const TString& optionName = "", bool checkRegister = true) : ChoicesMap(std::move(choicesMap)) , OptionName(optionName) + , CheckRegister(checkRegister) {} - TResult operator()(const TString& choice) const { - const auto it = ChoicesMap.find(choice); - // if (it == ChoicesMap.end()) { + TResult operator()(TString choice) const { + if (!CheckRegister) { + std::for_each(choice.begin(), choice.vend(), [](char& c) { c = std::tolower(c); }); + } - // throw yexception() << "Value '" << choice << "' is not allowed " << (OptionName ? TStringBuilder() << "for option " << OptionName : TStringBuilder()) << ", available variants:\n" << Join(", ") - // } + const auto it = ChoicesMap.find(choice); + if (it == ChoicesMap.end()) { + auto error = yexception() << "Value '" << choice << "' is not allowed " << (OptionName ? TStringBuilder() << "for option " << OptionName : TStringBuilder()) << ", available variants:\n"; + for (auto it = ChoicesMap.begin(); it != ChoicesMap.end();) { + error << choice; + if (++it != ChoicesMap.end()) { + error << ", "; + } + } + ythrow error; + } return it->second; } @@ -64,6 +75,7 @@ class TChoices { private: const std::map ChoicesMap; const TString OptionName; + const bool CheckRegister; }; class TStatsPrinter { @@ -95,6 +107,8 @@ void ModifyLogPriorities(std::unordered_map GetLogPrioritiesMap(const TString& optionName); + void SetupSignalActions(); void PrintResultSet(EResultOutputFormat format, IOutputStream& output, const Ydb::ResultSet& resultSet); From 887af1fb554eabdabf9ba430d5a8827b1eac9e03 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 20 Feb 2025 10:27:25 +0000 Subject: [PATCH 4/7] Added cnacel-on-file-finish flag --- ydb/tests/tools/fqrun/fqrun.cpp | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp index 33ec74693730..bd2cd387659b 100644 --- a/ydb/tests/tools/fqrun/fqrun.cpp +++ b/ydb/tests/tools/fqrun/fqrun.cpp @@ -185,11 +185,15 @@ class TMain : public TMainBase { .Handler1([this](const NLastGetopt::TOptsParser* option) { TStringBuf topicName, others; TStringBuf(option->CurVal()).Split('@', topicName, others); + TStringBuf path, partitionCountStr; TStringBuf(others).Split(':', path, partitionCountStr); size_t partitionCount = !partitionCountStr.empty() ? FromString(partitionCountStr) : 1; + if (!partitionCount) { + ythrow yexception() << "Topic partition count should be at least one"; + } if (topicName.empty() || path.empty()) { - ythrow yexception() << "Incorrect PQ file mapping, expected form topic@path[:partitions_count]" << Endl; + ythrow yexception() << "Incorrect PQ file mapping, expected form topic@path[:partitions_count]"; } if (!PqFilesMapping.emplace(topicName, NYql::TDummyTopic("pq", TString(topicName), TString(path), partitionCount)).second) { ythrow yexception() << "Got duplicated topic name: " << topicName; @@ -199,15 +203,7 @@ class TMain : public TMainBase { options.AddLongOption("cnacel-on-file-finish", "Cancel emulate YDS topics when topic file finished") .RequiredArgument("topic") .Handler1([this](const NLastGetopt::TOptsParser* option) { - TStringBuf topicName; - TStringBuf filePath; - TStringBuf(option->CurVal()).Split('@', topicName, filePath); - if (topicName.empty() || filePath.empty()) { - ythrow yexception() << "Incorrect PQ file mapping, expected form topic@file"; - } - if (!PqFilesMapping.emplace(topicName, filePath).second) { - ythrow yexception() << "Got duplicated topic name: " << topicName; - } + TopicsSettings[option->CurVal()].CancelOnFileFinish = true; }); // Outputs @@ -259,11 +255,18 @@ class TMain : public TMainBase { if (!PqFilesMapping.empty()) { auto fileGateway = MakeIntrusive(); - for (const auto& [_, topic] : PqFilesMapping) { + for (auto [_, topic] : PqFilesMapping) { + if (const auto it = TopicsSettings.find(topic.TopicName); it != TopicsSettings.end()) { + topic.CancelOnFileFinish = it->second.CancelOnFileFinish; + TopicsSettings.erase(it); + } fileGateway->AddDummyTopic(topic); } RunnerOptions.FqSettings.PqGateway = std::move(fileGateway); } + if (!TopicsSettings.empty()) { + ythrow yexception() << "Found topic settings for not existing topic: '" << TopicsSettings.begin()->first << "'"; + } #ifdef PROFILE_MEMORY_ALLOCATIONS if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) { @@ -302,6 +305,11 @@ class TMain : public TMainBase { TExecutionOptions ExecutionOptions; TRunnerOptions RunnerOptions; std::unordered_map PqFilesMapping; + + struct TTopicSettings { + bool CancelOnFileFinish = false; + }; + std::unordered_map TopicsSettings; }; } // anonymous namespace From 2c93408db495c9df106bea0f55f3deaa52350542 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 20 Feb 2025 10:49:10 +0000 Subject: [PATCH 5/7] Fixed style --- .../providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp | 2 +- .../yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h | 3 ++- ydb/tests/tools/fqrun/flame_graph.sh | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp index d6190f14ac99..4262b196dfe2 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp @@ -341,7 +341,7 @@ struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession { }; TFileTopicClient::TFileTopicClient(THashMap topics) - : Topics_(topics) + : Topics_(std::move(topics)) {} std::shared_ptr TFileTopicClient::CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) { diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h index 246692d76029..7b4ccae0d85c 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h @@ -34,4 +34,5 @@ struct TFileTopicClient : public ITopicClient { THashMap Topics_; bool CancelOnFileFinish_ = false; }; -} \ No newline at end of file + +} diff --git a/ydb/tests/tools/fqrun/flame_graph.sh b/ydb/tests/tools/fqrun/flame_graph.sh index dea766a02c75..b6f8fe6da4f0 100755 --- a/ydb/tests/tools/fqrun/flame_graph.sh +++ b/ydb/tests/tools/fqrun/flame_graph.sh @@ -3,7 +3,7 @@ set -eux function cleanup { - rm ./profdata + sudo rm ./profdata rm ./profdata.txt } trap cleanup EXIT From 0e05cf94a020215b69ae7b54e13445a8326a1d5c Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 20 Feb 2025 11:41:01 +0000 Subject: [PATCH 6/7] Fixed issues --- ydb/tests/tools/fqrun/fqrun.cpp | 2 +- ydb/tests/tools/fqrun/src/ya.make | 1 - ydb/tests/tools/fqrun/ya.make | 1 - ydb/tests/tools/kqprun/runlib/utils.h | 11 ++++------- ydb/tests/tools/kqprun/runlib/ya.make | 1 - 5 files changed, 5 insertions(+), 11 deletions(-) diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp index bd2cd387659b..4f1b096bde10 100644 --- a/ydb/tests/tools/fqrun/fqrun.cpp +++ b/ydb/tests/tools/fqrun/fqrun.cpp @@ -200,7 +200,7 @@ class TMain : public TMainBase { } }); - options.AddLongOption("cnacel-on-file-finish", "Cancel emulate YDS topics when topic file finished") + options.AddLongOption("cancel-on-file-finish", "Cancel emulate YDS topics when topic file finished") .RequiredArgument("topic") .Handler1([this](const NLastGetopt::TOptsParser* option) { TopicsSettings[option->CurVal()].CancelOnFileFinish = true; diff --git a/ydb/tests/tools/fqrun/src/ya.make b/ydb/tests/tools/fqrun/src/ya.make index 6c0e2827e5cd..a5a54edf3a87 100644 --- a/ydb/tests/tools/fqrun/src/ya.make +++ b/ydb/tests/tools/fqrun/src/ya.make @@ -9,7 +9,6 @@ SRCS( PEERDIR( library/cpp/colorizer library/cpp/testing/unittest - util ydb/core/fq/libs/config/protos ydb/core/fq/libs/control_plane_proxy/events ydb/core/fq/libs/init diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make index d1e6db1f21b1..3a62034e2414 100644 --- a/ydb/tests/tools/fqrun/ya.make +++ b/ydb/tests/tools/fqrun/ya.make @@ -14,7 +14,6 @@ PEERDIR( library/cpp/colorizer library/cpp/getopt library/cpp/lfalloc/alloc_profiler - util ydb/core/blob_depot ydb/library/yql/providers/pq/gateway/dummy ydb/tests/tools/fqrun/src diff --git a/ydb/tests/tools/kqprun/runlib/utils.h b/ydb/tests/tools/kqprun/runlib/utils.h index da6868d2d971..4bf873e48cae 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.h +++ b/ydb/tests/tools/kqprun/runlib/utils.h @@ -47,14 +47,11 @@ class TChoices { const auto it = ChoicesMap.find(choice); if (it == ChoicesMap.end()) { - auto error = yexception() << "Value '" << choice << "' is not allowed " << (OptionName ? TStringBuilder() << "for option " << OptionName : TStringBuilder()) << ", available variants:\n"; - for (auto it = ChoicesMap.begin(); it != ChoicesMap.end();) { - error << choice; - if (++it != ChoicesMap.end()) { - error << ", "; - } + auto error = yexception() << "Value '" << choice << "' is not allowed " << (OptionName ? TStringBuilder() << "for option " << OptionName : TStringBuilder()) << ", available variants:"; + for (const auto& [value, _] : ChoicesMap) { + error << " " << value; } - ythrow error; + throw error; } return it->second; } diff --git a/ydb/tests/tools/kqprun/runlib/ya.make b/ydb/tests/tools/kqprun/runlib/ya.make index 745ffcdee03f..39bd1328bfe7 100644 --- a/ydb/tests/tools/kqprun/runlib/ya.make +++ b/ydb/tests/tools/kqprun/runlib/ya.make @@ -9,7 +9,6 @@ PEERDIR( library/cpp/colorizer library/cpp/getopt library/cpp/json - util ydb/core/base ydb/core/blob_depot ydb/core/fq/libs/compute/common From a62b16e4735d5766350aebcd292c3211fe726893 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 20 Feb 2025 11:46:17 +0000 Subject: [PATCH 7/7] Fixed issues 2 --- ydb/tests/tools/fqrun/README.md | 2 +- ydb/tests/tools/fqrun/ya.make | 1 - ydb/tests/tools/kqprun/README.md | 2 +- ydb/tests/tools/kqprun/ya.make | 1 - 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/ydb/tests/tools/fqrun/README.md b/ydb/tests/tools/fqrun/README.md index e1c7dd247546..dcaee95ccc7c 100644 --- a/ydb/tests/tools/fqrun/README.md +++ b/ydb/tests/tools/fqrun/README.md @@ -2,7 +2,7 @@ Tool can be used to execute streaming queries by using FQ proxy infrastructure. -For profiling memory allocations build fqrun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS`. +For profiling memory allocations build fqrun with ya make flags `-D PROFILE_MEMORY_ALLOCATIONS -D CXXFLAGS=-DPROFILE_MEMORY_ALLOCATIONS`. ## Scripts diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make index 3a62034e2414..a80046acebed 100644 --- a/ydb/tests/tools/fqrun/ya.make +++ b/ydb/tests/tools/fqrun/ya.make @@ -3,7 +3,6 @@ PROGRAM(fqrun) IF (PROFILE_MEMORY_ALLOCATIONS) MESSAGE("Enabled profile memory allocations") ALLOCATOR(LF_DBG) - CFLAGS(-D PROFILE_MEMORY_ALLOCATIONS) ENDIF() SRCS( diff --git a/ydb/tests/tools/kqprun/README.md b/ydb/tests/tools/kqprun/README.md index afb207a2fed9..10e283294135 100644 --- a/ydb/tests/tools/kqprun/README.md +++ b/ydb/tests/tools/kqprun/README.md @@ -2,7 +2,7 @@ Tool can be used to execute queries by using kikimr provider. -For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS`. +For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS -D CXXFLAGS=-DPROFILE_MEMORY_ALLOCATIONS`. ## Examples diff --git a/ydb/tests/tools/kqprun/ya.make b/ydb/tests/tools/kqprun/ya.make index 81689d3bd5a3..b39159f0ea40 100644 --- a/ydb/tests/tools/kqprun/ya.make +++ b/ydb/tests/tools/kqprun/ya.make @@ -3,7 +3,6 @@ PROGRAM(kqprun) IF (PROFILE_MEMORY_ALLOCATIONS) MESSAGE("Enabled profile memory allocations") ALLOCATOR(LF_DBG) - CFLAGS(-D PROFILE_MEMORY_ALLOCATIONS) ENDIF() SRCS(