diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h index a39fc4f3882e..fef31d7dc09b 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h @@ -26,6 +26,7 @@ struct TDummyTopic { TString TopicName; TMaybe Path; size_t PartitionsCount; + bool CancelOnFileFinish = false; }; // Dummy Pq gateway for tests. diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp index 4be95da05904..4262b196dfe2 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp @@ -16,7 +16,7 @@ class TFileTopicReadSession : public NYdb::NTopic::IReadSession { constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); public: - TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "") + TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "", bool cancelOnFileFinish = false) : File_(std::move(file)) , Session_(std::move(session)) , ProducerId_(producerId) @@ -24,6 +24,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); PollFileForChanges(); }) , Counters_() + , CancelOnFileFinish_(cancelOnFileFinish) { Pool_.Start(1); } @@ -123,7 +124,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); size_t size = 0; ui64 maxBatchRowSize = 100; - while (size_t read = fi.ReadLine(rawMsg)) { + while (fi.ReadLine(rawMsg)) { msgs.emplace_back(MakeNextMessage(rawMsg)); MsgOffset_++; if (!maxBatchRowSize--) { @@ -133,6 +134,8 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); } if (!msgs.empty()) { EventsQ_.Push(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, Session_), size); + } else if (CancelOnFileFinish_) { + EventsQ_.Push(NYdb::NTopic::TSessionClosedEvent(NYdb::EStatus::CANCELLED, {NYdb::NIssue::TIssue("PQ file topic was finished")}), size); } Sleep(FILE_POLL_PERIOD); @@ -145,6 +148,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); TString ProducerId_; std::thread FilePoller_; NYdb::NTopic::TReaderCounters::TPtr Counters_; + bool CancelOnFileFinish_ = false; TThreadPool Pool_; size_t MsgOffset_ = 0; @@ -336,6 +340,10 @@ struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession { } }; +TFileTopicClient::TFileTopicClient(THashMap topics) + : Topics_(std::move(topics)) +{} + std::shared_ptr TFileTopicClient::CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) { Y_ENSURE(!settings.Topics_.empty()); const auto& topic = settings.Topics_.front(); @@ -358,7 +366,8 @@ std::shared_ptr TFileTopicClient::CreateReadSession( ui64 sessionId = 0; return std::make_shared( TFile(*filePath, EOpenMode::TEnum::RdOnly), - MakeIntrusive(sessionId, TString{topicPath}, partitionId) + MakeIntrusive(sessionId, TString{topicPath}, partitionId), + "", topicsIt->second.CancelOnFileFinish ); } diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h index f80426726159..7b4ccae0d85c 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h @@ -4,7 +4,7 @@ namespace NYql { struct TFileTopicClient : public ITopicClient { - TFileTopicClient(THashMap topics): Topics_(topics) {} + explicit TFileTopicClient(THashMap topics); NYdb::TAsyncStatus CreateTopic(const TString& path, const NYdb::NTopic::TCreateTopicSettings& settings = {}) override; @@ -32,5 +32,7 @@ struct TFileTopicClient : public ITopicClient { private: THashMap Topics_; + bool CancelOnFileFinish_ = false; }; -} \ No newline at end of file + +} diff --git a/ydb/tests/tools/fqrun/.gitignore b/ydb/tests/tools/fqrun/.gitignore index 99dcb6bb049f..ee3ed5154ff0 100644 --- a/ydb/tests/tools/fqrun/.gitignore +++ b/ydb/tests/tools/fqrun/.gitignore @@ -5,3 +5,5 @@ sync_dir *.conf *.parquet *.json +*.svg +*.txt diff --git a/ydb/tests/tools/fqrun/README.md b/ydb/tests/tools/fqrun/README.md index cd133168dcd4..dcaee95ccc7c 100644 --- a/ydb/tests/tools/fqrun/README.md +++ b/ydb/tests/tools/fqrun/README.md @@ -2,6 +2,15 @@ Tool can be used to execute streaming queries by using FQ proxy infrastructure. +For profiling memory allocations build fqrun with ya make flags `-D PROFILE_MEMORY_ALLOCATIONS -D CXXFLAGS=-DPROFILE_MEMORY_ALLOCATIONS`. + +## Scripts + +* `flame_graph.sh` - script for collecting flame graphs in svg format, usage: + ```(bash) + ./flame_graph.sh [graph collection time in seconds] + ``` + ## Examples ### Queries @@ -25,7 +34,7 @@ Tool can be used to execute streaming queries by using FQ proxy infrastructure. ./fqrun -M 32000 ``` - Monitoring endpoint: https://localhost:32000 + Monitoring endpoint: http://localhost:32000 * gRPC endpoint: ```(bash) diff --git a/ydb/tests/tools/fqrun/configuration/as_config.conf b/ydb/tests/tools/fqrun/configuration/as_config.conf new file mode 100644 index 000000000000..c1f7d09e7f6a --- /dev/null +++ b/ydb/tests/tools/fqrun/configuration/as_config.conf @@ -0,0 +1,46 @@ +Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 10 + Name: "System" +} +Executor { + Type: BASIC + Threads: 6 + SpinThreshold: 1 + Name: "User" +} +Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 1 + Name: "Batch" +} +Executor { + Type: IO + Threads: 1 + Name: "IO" +} +Executor { + Type: BASIC + Threads: 2 + SpinThreshold: 10 + Name: "IC" + TimePerMailboxMicroSecs: 100 +} + +Scheduler { + Resolution: 64 + SpinThreshold: 0 + ProgressThreshold: 10000 +} + +SysExecutor: 0 +UserExecutor: 1 +IoExecutor: 3 +BatchExecutor: 2 + +ServiceExecutor { + ServiceName: "Interconnect" + ExecutorId: 4 +} diff --git a/ydb/tests/tools/fqrun/flame_graph.sh b/ydb/tests/tools/fqrun/flame_graph.sh new file mode 100755 index 000000000000..b6f8fe6da4f0 --- /dev/null +++ b/ydb/tests/tools/fqrun/flame_graph.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +set -eux + +function cleanup { + sudo rm ./profdata + rm ./profdata.txt +} +trap cleanup EXIT + +if [ $# -gt 1 ]; then + echo "Too many arguments" + exit -1 +fi + +fqrun_pid=$(pgrep -u $USER fqrun) + +sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $fqrun_pid -v -o profdata -- sleep ${1:-'30'} +sudo perf script -i profdata > profdata.txt + +SCRIPT_DIR=$(cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) + +flame_graph_tool="$SCRIPT_DIR/../../../../contrib/tools/flame-graph/" + +${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp index c5c0133fc4de..4f1b096bde10 100644 --- a/ydb/tests/tools/fqrun/fqrun.cpp +++ b/ydb/tests/tools/fqrun/fqrun.cpp @@ -3,11 +3,16 @@ #include +#include #include #include #include #include +#ifdef PROFILE_MEMORY_ALLOCATIONS +#include +#endif + using namespace NKikimrRun; namespace NFqRun { @@ -156,6 +161,21 @@ class TMain : public TMainBase { } }); + options.AddLongOption("as-cfg", "File with actor system config (TActorSystemConfig), use '-' for default") + .RequiredArgument("file") + .DefaultValue("./configuration/as_config.conf") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + const TString file(option->CurValOrDef()); + if (file == "-") { + return; + } + + RunnerOptions.FqSettings.ActorSystemConfig = NKikimrConfig::TActorSystemConfig(); + if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &(*RunnerOptions.FqSettings.ActorSystemConfig))) { + ythrow yexception() << "Bad format of actor system configuration"; + } + }); + options.AddLongOption("emulate-s3", "Enable readings by s3 provider from files, `bucket` value in connection - path to folder with files") .NoArgument() .SetFlag(&RunnerOptions.FqSettings.EmulateS3); @@ -165,17 +185,27 @@ class TMain : public TMainBase { .Handler1([this](const NLastGetopt::TOptsParser* option) { TStringBuf topicName, others; TStringBuf(option->CurVal()).Split('@', topicName, others); + TStringBuf path, partitionCountStr; TStringBuf(others).Split(':', path, partitionCountStr); size_t partitionCount = !partitionCountStr.empty() ? FromString(partitionCountStr) : 1; + if (!partitionCount) { + ythrow yexception() << "Topic partition count should be at least one"; + } if (topicName.empty() || path.empty()) { - ythrow yexception() << "Incorrect PQ file mapping, expected form topic@path[:partitions_count]" << Endl; + ythrow yexception() << "Incorrect PQ file mapping, expected form topic@path[:partitions_count]"; } if (!PqFilesMapping.emplace(topicName, NYql::TDummyTopic("pq", TString(topicName), TString(path), partitionCount)).second) { ythrow yexception() << "Got duplicated topic name: " << topicName; } }); + options.AddLongOption("cancel-on-file-finish", "Cancel emulate YDS topics when topic file finished") + .RequiredArgument("topic") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + TopicsSettings[option->CurVal()].CancelOnFileFinish = true; + }); + // Outputs options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)") @@ -210,6 +240,7 @@ class TMain : public TMainBase { ExecutionOptions.Validate(RunnerOptions); RunnerOptions.FqSettings.YqlToken = GetEnv(YQL_TOKEN_VARIABLE); + RunnerOptions.FqSettings.FunctionRegistry = CreateFunctionRegistry().Get(); auto& gatewayConfig = *RunnerOptions.FqSettings.FqConfig.mutable_gateways(); FillTokens(gatewayConfig.mutable_pq()); @@ -224,14 +255,39 @@ class TMain : public TMainBase { if (!PqFilesMapping.empty()) { auto fileGateway = MakeIntrusive(); - for (const auto& [_, topic] : PqFilesMapping) { + for (auto [_, topic] : PqFilesMapping) { + if (const auto it = TopicsSettings.find(topic.TopicName); it != TopicsSettings.end()) { + topic.CancelOnFileFinish = it->second.CancelOnFileFinish; + TopicsSettings.erase(it); + } fileGateway->AddDummyTopic(topic); } RunnerOptions.FqSettings.PqGateway = std::move(fileGateway); } + if (!TopicsSettings.empty()) { + ythrow yexception() << "Found topic settings for not existing topic: '" << TopicsSettings.begin()->first << "'"; + } + +#ifdef PROFILE_MEMORY_ALLOCATIONS + if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) { + Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl; + } + NAllocProfiler::StartAllocationSampling(true); +#else + if (ProfileAllocationsOutput) { + ythrow yexception() << "Profile memory allocations disabled, please rebuild fqrun with flag `-D PROFILE_MEMORY_ALLOCATIONS`"; + } +#endif RunScript(ExecutionOptions, RunnerOptions); +#ifdef PROFILE_MEMORY_ALLOCATIONS + if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) { + Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl; + } + FinishProfileMemoryAllocations(); +#endif + return 0; } @@ -249,6 +305,11 @@ class TMain : public TMainBase { TExecutionOptions ExecutionOptions; TRunnerOptions RunnerOptions; std::unordered_map PqFilesMapping; + + struct TTopicSettings { + bool CancelOnFileFinish = false; + }; + std::unordered_map TopicsSettings; }; } // anonymous namespace diff --git a/ydb/tests/tools/fqrun/src/common.h b/ydb/tests/tools/fqrun/src/common.h index 53dfbe654e93..457897781891 100644 --- a/ydb/tests/tools/fqrun/src/common.h +++ b/ydb/tests/tools/fqrun/src/common.h @@ -7,6 +7,8 @@ #include #include +#include + namespace NFqRun { constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; @@ -27,8 +29,10 @@ struct TFqSetupSettings : public NKikimrRun::TServerSettings { TString YqlToken; NYql::IPqGateway::TPtr PqGateway; + TIntrusivePtr FunctionRegistry; NFq::NConfig::TConfig FqConfig; NKikimrConfig::TLogConfig LogConfig; + std::optional ActorSystemConfig; }; struct TRunnerOptions { diff --git a/ydb/tests/tools/fqrun/src/fq_setup.cpp b/ydb/tests/tools/fqrun/src/fq_setup.cpp index 394bfc5c3e16..a4fbfd104777 100644 --- a/ydb/tests/tools/fqrun/src/fq_setup.cpp +++ b/ydb/tests/tools/fqrun/src/fq_setup.cpp @@ -44,6 +44,14 @@ class TFqSetup::TImpl { serverSettings.SetLoggerInitializer(loggerInitializer); } + void SetFunctionRegistry(NKikimr::Tests::TServerSettings& serverSettings) const { + if (Settings.FunctionRegistry) { + serverSettings.SetFrFactory([this](const NKikimr::NScheme::TTypeRegistry&) { + return Settings.FunctionRegistry.Get(); + }); + } + } + NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) { NKikimr::Tests::TServerSettings serverSettings(PortManager.GetPort()); @@ -52,9 +60,13 @@ class TFqSetup::TImpl { NKikimrConfig::TAppConfig config; *config.MutableLogConfig() = Settings.LogConfig; + if (Settings.ActorSystemConfig) { + *config.MutableActorSystemConfig() = *Settings.ActorSystemConfig; + } serverSettings.SetAppConfig(config); SetLoggerSettings(serverSettings); + SetFunctionRegistry(serverSettings); if (Settings.MonitoringEnabled) { serverSettings.InitKikimrRunConfig(); diff --git a/ydb/tests/tools/fqrun/src/ya.make b/ydb/tests/tools/fqrun/src/ya.make index 6f44cf16c38d..a5a54edf3a87 100644 --- a/ydb/tests/tools/fqrun/src/ya.make +++ b/ydb/tests/tools/fqrun/src/ya.make @@ -9,7 +9,6 @@ SRCS( PEERDIR( library/cpp/colorizer library/cpp/testing/unittest - util ydb/core/fq/libs/config/protos ydb/core/fq/libs/control_plane_proxy/events ydb/core/fq/libs/init @@ -20,6 +19,7 @@ PEERDIR( ydb/library/security ydb/library/yql/providers/pq/provider ydb/tests/tools/kqprun/runlib + yql/essentials/minikql ) YQL_LAST_ABI_VERSION() diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make index 6553c9748b08..a80046acebed 100644 --- a/ydb/tests/tools/fqrun/ya.make +++ b/ydb/tests/tools/fqrun/ya.make @@ -1,5 +1,10 @@ PROGRAM(fqrun) +IF (PROFILE_MEMORY_ALLOCATIONS) + MESSAGE("Enabled profile memory allocations") + ALLOCATOR(LF_DBG) +ENDIF() + SRCS( fqrun.cpp ) @@ -7,7 +12,8 @@ SRCS( PEERDIR( library/cpp/colorizer library/cpp/getopt - util + library/cpp/lfalloc/alloc_profiler + ydb/core/blob_depot ydb/library/yql/providers/pq/gateway/dummy ydb/tests/tools/fqrun/src ydb/tests/tools/kqprun/runlib diff --git a/ydb/tests/tools/kqprun/README.md b/ydb/tests/tools/kqprun/README.md index e5d82a018c56..10e283294135 100644 --- a/ydb/tests/tools/kqprun/README.md +++ b/ydb/tests/tools/kqprun/README.md @@ -2,7 +2,7 @@ Tool can be used to execute queries by using kikimr provider. -For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS`. +For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS -D CXXFLAGS=-DPROFILE_MEMORY_ALLOCATIONS`. ## Examples @@ -42,7 +42,7 @@ For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMO ./kqprun -M 32000 ``` - Monitoring endpoint: https://localhost:32000 + Monitoring endpoint: http://localhost:32000 * gRPC endpoint: ```(bash) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index e860615e9fd7..3ec48c7d5b6e 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -14,9 +14,6 @@ #include #include -#include -#include - #include #include #include @@ -402,62 +399,18 @@ void RunScript(const TExecutionOptions& executionOptions, const TRunnerOptions& } -TIntrusivePtr CreateFunctionRegistry(const TString& udfsDirectory, TVector udfsPaths, bool excludeLinkedUdfs) { - if (!udfsDirectory.empty() || !udfsPaths.empty()) { - NColorizer::TColors colors = NColorizer::AutoColors(Cout); - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching udfs..." << colors.Default() << Endl; - } - - NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths); - auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone(); - - if (excludeLinkedUdfs) { - for (const auto& wrapper : NYql::NUdf::GetStaticUdfModuleWrapperList()) { - auto [name, ptr] = wrapper(); - if (!functionRegistry->IsLoadedUdfModule(name)) { - functionRegistry->AddModule(TString(NKikimr::NMiniKQL::StaticModulePrefix) + name, name, std::move(ptr)); - } - } - } else { - NKikimr::NMiniKQL::FillStaticModules(*functionRegistry); - } - - return functionRegistry; -} - - class TMain : public TMainBase { using EVerbose = TYdbSetupSettings::EVerbose; inline static const TString YqlToken = GetEnv(YQL_TOKEN_VARIABLE); - inline static IOutputStream* ProfileAllocationsOutput = nullptr; - inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); TExecutionOptions ExecutionOptions; TRunnerOptions RunnerOptions; std::unordered_map Templates; THashMap TablesMapping; - TVector UdfsPaths; - TString UdfsDirectory; - bool ExcludeLinkedUdfs = false; bool EmulateYt = false; -#ifdef PROFILE_MEMORY_ALLOCATIONS -public: - static void FinishProfileMemoryAllocations() { - if (ProfileAllocationsOutput) { - NAllocProfiler::StopAllocationSampling(*ProfileAllocationsOutput); - } else { - TString output; - TStringOutput stream(output); - NAllocProfiler::StopAllocationSampling(stream); - - Cout << CoutColors.Red() << "Warning: profile memory allocations output is not specified, please use flag `--profile-output` for writing profile info (dump size " << NKikimr::NBlobDepot::FormatByteSize(output.size()) << ")" << CoutColors.Default() << Endl; - } - } -#endif - protected: void RegisterOptions(NLastGetopt::TOpts& options) override { options.SetTitle("KqpRun -- tool to execute queries by using kikimr provider (instead of dq provider in DQrun tool)"); @@ -538,18 +491,6 @@ class TMain : public TMainBase { } }); - options.AddLongOption('u', "udf", "Load shared library with UDF by given path") - .RequiredArgument("file") - .EmplaceTo(&UdfsPaths); - - options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") - .RequiredArgument("directory") - .StoreResult(&UdfsDirectory); - - options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") - .NoArgument() - .SetFlag(&ExcludeLinkedUdfs); - // Outputs TChoices traceOpt({ @@ -644,10 +585,6 @@ class TMain : public TMainBase { RunnerOptions.ScriptQueryTimelineFiles.emplace_back(file); }); - options.AddLongOption("profile-output", "File with profile memory allocations output (use '-' to write in stdout)") - .RequiredArgument("file") - .StoreMappedResultT(&ProfileAllocationsOutput, &GetDefaultOutput); - // Pipeline settings TChoices executionCase({ @@ -850,7 +787,7 @@ class TMain : public TMainBase { } RunnerOptions.YdbSettings.YqlToken = YqlToken; - RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get(); + RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry().Get(); auto& appConfig = RunnerOptions.YdbSettings.AppConfig; if (ExecutionOptions.ResultsRowsLimit) { @@ -870,7 +807,7 @@ class TMain : public TMainBase { } #ifdef PROFILE_MEMORY_ALLOCATIONS - if (RunnerOptions.YdbSettings.VerboseLevel >= 1) { + if (RunnerOptions.YdbSettings.VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl; } NAllocProfiler::StartAllocationSampling(true); @@ -883,7 +820,7 @@ class TMain : public TMainBase { RunScript(ExecutionOptions, RunnerOptions); #ifdef PROFILE_MEMORY_ALLOCATIONS - if (RunnerOptions.YdbSettings.VerboseLevel >= 1) { + if (RunnerOptions.YdbSettings.VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl; } FinishProfileMemoryAllocations(); @@ -908,17 +845,6 @@ class TMain : public TMainBase { } }; -#ifdef PROFILE_MEMORY_ALLOCATIONS -void InterruptHandler(int) { - NColorizer::TColors colors = NColorizer::AutoColors(Cerr); - - Cout << colors.Red() << "Execution interrupted, finishing profile memory allocations..." << colors.Default() << Endl; - TMain::FinishProfileMemoryAllocations(); - - abort(); -} -#endif - } // anonymous namespace } // namespace NKqpRun @@ -926,10 +852,6 @@ void InterruptHandler(int) { int main(int argc, const char* argv[]) { SetupSignalActions(); -#ifdef PROFILE_MEMORY_ALLOCATIONS - signal(SIGINT, &NKqpRun::InterruptHandler); -#endif - try { NKqpRun::TMain().Run(argc, argv); } catch (...) { diff --git a/ydb/tests/tools/kqprun/runlib/application.cpp b/ydb/tests/tools/kqprun/runlib/application.cpp index 9a35d584f19e..6b27760b5b51 100644 --- a/ydb/tests/tools/kqprun/runlib/application.cpp +++ b/ydb/tests/tools/kqprun/runlib/application.cpp @@ -1,15 +1,50 @@ #include "application.h" #include "utils.h" +#include #include #include #include +#include + +#include +#include + +#ifdef PROFILE_MEMORY_ALLOCATIONS +#include +#endif namespace NKikimrRun { +#ifdef PROFILE_MEMORY_ALLOCATIONS +void TMainBase::FinishProfileMemoryAllocations() { + if (ProfileAllocationsOutput) { + NAllocProfiler::StopAllocationSampling(*ProfileAllocationsOutput); + } else { + TString output; + TStringOutput stream(output); + NAllocProfiler::StopAllocationSampling(stream); + + Cout << CoutColors.Red() << "Warning: profile memory allocations output is not specified, please use flag `--profile-output` for writing profile info (dump size " << NKikimr::NBlobDepot::FormatByteSize(output.size()) << ")" << CoutColors.Default() << Endl; + } +} +#endif + void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettings& settings) { + options.AddLongOption('u', "udf", "Load shared library with UDF by given path") + .RequiredArgument("file") + .EmplaceTo(&UdfsPaths); + + options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") + .RequiredArgument("directory") + .StoreResult(&UdfsDirectory); + + options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") + .NoArgument() + .SetFlag(&ExcludeLinkedUdfs); + options.AddLongOption("log-file", "File with execution logs (writes in stderr if empty)") .RequiredArgument("file") .StoreResult(&settings.LogOutputFile) @@ -19,41 +54,11 @@ void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettin } }); - TChoices logPriority({ - {"emerg", NActors::NLog::EPriority::PRI_EMERG}, - {"alert", NActors::NLog::EPriority::PRI_ALERT}, - {"crit", NActors::NLog::EPriority::PRI_CRIT}, - {"error", NActors::NLog::EPriority::PRI_ERROR}, - {"warn", NActors::NLog::EPriority::PRI_WARN}, - {"notice", NActors::NLog::EPriority::PRI_NOTICE}, - {"info", NActors::NLog::EPriority::PRI_INFO}, - {"debug", NActors::NLog::EPriority::PRI_DEBUG}, - {"trace", NActors::NLog::EPriority::PRI_TRACE}, - }); - options.AddLongOption("log-default", "Default log priority") - .RequiredArgument("priority") - .Choices(logPriority.GetChoices()) - .StoreMappedResultT(&DefaultLogPriority, logPriority); + RegisterLogOptions(options); - options.AddLongOption("log", "Component log priority in format = (e. g. KQP_YQL=trace)") - .RequiredArgument("component priority") - .Handler1([this, logPriority](const NLastGetopt::TOptsParser* option) { - TStringBuf component; - TStringBuf priority; - TStringBuf(option->CurVal()).Split('=', component, priority); - if (component.empty() || priority.empty()) { - ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace"; - } - - if (!logPriority.Contains(TString(priority))) { - ythrow yexception() << "Incorrect log priority: " << priority; - } - - const auto service = GetLogService(TString(component)); - if (!LogPriorities.emplace(service, logPriority(TString(priority))).second) { - ythrow yexception() << "Got duplicated log service name: " << component; - } - }); + options.AddLongOption("profile-output", "File with profile memory allocations output (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&ProfileAllocationsOutput, &GetDefaultOutput); options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used will be run as daemon") .RequiredArgument("uint") @@ -92,6 +97,28 @@ void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettin }); } +void TMainBase::RegisterLogOptions(NLastGetopt::TOpts& options) { + options.AddLongOption("log-default", "Default log priority") + .RequiredArgument("priority") + .StoreMappedResultT(&DefaultLogPriority, GetLogPrioritiesMap("log-default")); + + options.AddLongOption("log", "Component log priority in format = (e. g. KQP_YQL=trace)") + .RequiredArgument("component priority") + .Handler1([this, logPriority = GetLogPrioritiesMap("log")](const NLastGetopt::TOptsParser* option) { + TStringBuf component; + TStringBuf priority; + TStringBuf(option->CurVal()).Split('=', component, priority); + if (component.empty() || priority.empty()) { + ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace"; + } + + const auto service = GetLogService(TString(component)); + if (!LogPriorities.emplace(service, logPriority(TString(priority))).second) { + ythrow yexception() << "Got duplicated log service name: " << component; + } + }); +} + void TMainBase::FillLogConfig(NKikimrConfig::TLogConfig& config) const { if (DefaultLogPriority) { config.SetDefaultLevel(*DefaultLogPriority); @@ -110,4 +137,28 @@ IOutputStream* TMainBase::GetDefaultOutput(const TString& file) { return nullptr; } +TIntrusivePtr TMainBase::CreateFunctionRegistry() const { + if (!UdfsDirectory.empty() || !UdfsPaths.empty()) { + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching udfs..." << colors.Default() << Endl; + } + + auto paths = UdfsPaths; + NKikimr::NMiniKQL::FindUdfsInDir(UdfsDirectory, &paths); + auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, paths)->Clone(); + + if (ExcludeLinkedUdfs) { + for (const auto& wrapper : NYql::NUdf::GetStaticUdfModuleWrapperList()) { + auto [name, ptr] = wrapper(); + if (!functionRegistry->IsLoadedUdfModule(name)) { + functionRegistry->AddModule(TString(NKikimr::NMiniKQL::StaticModulePrefix) + name, name, std::move(ptr)); + } + } + } else { + NKikimr::NMiniKQL::FillStaticModules(*functionRegistry); + } + + return functionRegistry; +} + } // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/application.h b/ydb/tests/tools/kqprun/runlib/application.h index 88eb2e289011..fd67eb2ac0d9 100644 --- a/ydb/tests/tools/kqprun/runlib/application.h +++ b/ydb/tests/tools/kqprun/runlib/application.h @@ -2,6 +2,7 @@ #include "settings.h" +#include #include #include @@ -10,21 +11,40 @@ #include #include +#include + namespace NKikimrRun { class TMainBase : public TMainClassArgs { +#ifdef PROFILE_MEMORY_ALLOCATIONS +public: + static void FinishProfileMemoryAllocations(); +#endif + protected: void RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettings& settings); + virtual void RegisterLogOptions(NLastGetopt::TOpts& options); + void FillLogConfig(NKikimrConfig::TLogConfig& config) const; static IOutputStream* GetDefaultOutput(const TString& file); + TIntrusivePtr CreateFunctionRegistry() const; + +protected: + inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); + inline static IOutputStream* ProfileAllocationsOutput = nullptr; + private: inline static std::vector> FileHolders; std::optional DefaultLogPriority; std::unordered_map LogPriorities; + + TString UdfsDirectory; + TVector UdfsPaths; + bool ExcludeLinkedUdfs; }; } // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/utils.cpp b/ydb/tests/tools/kqprun/runlib/utils.cpp index 208cf4daf045..f8c37d7b67fe 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.cpp +++ b/ydb/tests/tools/kqprun/runlib/utils.cpp @@ -1,4 +1,5 @@ #include "utils.h" +#include "application.h" #include #include @@ -49,6 +50,17 @@ void FloatingPointExceptionHandler(int) { abort(); } +#ifdef PROFILE_MEMORY_ALLOCATIONS +void InterruptHandler(int) { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cout << colors.Red() << "Execution interrupted, finishing profile memory allocations..." << colors.Default() << Endl; + TMainBase::FinishProfileMemoryAllocations(); + + abort(); +} +#endif + } // nonymous namespace @@ -233,10 +245,28 @@ void InitLogSettings(const NKikimrConfig::TLogConfig& logConfig, NActors::TTestA } } +TChoices GetLogPrioritiesMap(const TString& optionName) { + return TChoices({ + {"emerg", NActors::NLog::EPriority::PRI_EMERG}, + {"alert", NActors::NLog::EPriority::PRI_ALERT}, + {"crit", NActors::NLog::EPriority::PRI_CRIT}, + {"error", NActors::NLog::EPriority::PRI_ERROR}, + {"warn", NActors::NLog::EPriority::PRI_WARN}, + {"notice", NActors::NLog::EPriority::PRI_NOTICE}, + {"info", NActors::NLog::EPriority::PRI_INFO}, + {"debug", NActors::NLog::EPriority::PRI_DEBUG}, + {"trace", NActors::NLog::EPriority::PRI_TRACE}, + }, optionName, false); +} + void SetupSignalActions() { std::set_terminate(&TerminateHandler); signal(SIGSEGV, &SegmentationFaultHandler); signal(SIGFPE, &FloatingPointExceptionHandler); + +#ifdef PROFILE_MEMORY_ALLOCATIONS + signal(SIGINT, &InterruptHandler); +#endif } void PrintResultSet(EResultOutputFormat format, IOutputStream& output, const Ydb::ResultSet& resultSet) { diff --git a/ydb/tests/tools/kqprun/runlib/utils.h b/ydb/tests/tools/kqprun/runlib/utils.h index 9e4d69c14501..4bf873e48cae 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.h +++ b/ydb/tests/tools/kqprun/runlib/utils.h @@ -34,12 +34,26 @@ struct TRequestResult { template class TChoices { public: - explicit TChoices(std::map choicesMap) + explicit TChoices(std::map choicesMap, const TString& optionName = "", bool checkRegister = true) : ChoicesMap(std::move(choicesMap)) + , OptionName(optionName) + , CheckRegister(checkRegister) {} - TResult operator()(const TString& choice) const { - return ChoicesMap.at(choice); + TResult operator()(TString choice) const { + if (!CheckRegister) { + std::for_each(choice.begin(), choice.vend(), [](char& c) { c = std::tolower(c); }); + } + + const auto it = ChoicesMap.find(choice); + if (it == ChoicesMap.end()) { + auto error = yexception() << "Value '" << choice << "' is not allowed " << (OptionName ? TStringBuilder() << "for option " << OptionName : TStringBuilder()) << ", available variants:"; + for (const auto& [value, _] : ChoicesMap) { + error << " " << value; + } + throw error; + } + return it->second; } TVector GetChoices() const { @@ -57,6 +71,8 @@ class TChoices { private: const std::map ChoicesMap; + const TString OptionName; + const bool CheckRegister; }; class TStatsPrinter { @@ -88,6 +104,8 @@ void ModifyLogPriorities(std::unordered_map GetLogPrioritiesMap(const TString& optionName); + void SetupSignalActions(); void PrintResultSet(EResultOutputFormat format, IOutputStream& output, const Ydb::ResultSet& resultSet); diff --git a/ydb/tests/tools/kqprun/runlib/ya.make b/ydb/tests/tools/kqprun/runlib/ya.make index 352a3a423105..39bd1328bfe7 100644 --- a/ydb/tests/tools/kqprun/runlib/ya.make +++ b/ydb/tests/tools/kqprun/runlib/ya.make @@ -9,7 +9,6 @@ PEERDIR( library/cpp/colorizer library/cpp/getopt library/cpp/json - util ydb/core/base ydb/core/blob_depot ydb/core/fq/libs/compute/common @@ -20,7 +19,10 @@ PEERDIR( ydb/public/api/protos ydb/public/lib/json_value ydb/public/lib/ydb_cli/common + yql/essentials/minikql + yql/essentials/minikql/invoke_builtins yql/essentials/public/issue + yql/essentials/public/udf ) YQL_LAST_ABI_VERSION() diff --git a/ydb/tests/tools/kqprun/ya.make b/ydb/tests/tools/kqprun/ya.make index 81689d3bd5a3..b39159f0ea40 100644 --- a/ydb/tests/tools/kqprun/ya.make +++ b/ydb/tests/tools/kqprun/ya.make @@ -3,7 +3,6 @@ PROGRAM(kqprun) IF (PROFILE_MEMORY_ALLOCATIONS) MESSAGE("Enabled profile memory allocations") ALLOCATOR(LF_DBG) - CFLAGS(-D PROFILE_MEMORY_ALLOCATIONS) ENDIF() SRCS(