From 688d711c2feee7551d2810210aab8543eb267b10 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sun, 10 Nov 2024 15:23:58 +0000 Subject: [PATCH 1/8] Enabled LLVM in purecalc filters --- ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 3749fae41819..f2944ffe1600 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -362,7 +362,10 @@ TRowDispatcher::TRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway) : Config(config) , CredentialsProviderFactory(credentialsProviderFactory) - , PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + , PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory( + NYql::NPureCalc::TProgramFactoryOptions() + .SetLLVMSettings("ON") + )) , YqSharedResources(yqSharedResources) , CredentialsFactory(credentialsFactory) , LogPrefix("RowDispatcher: ") From bfd302776c29e6ccc4f2e2410348af0e3b6f4089 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sun, 10 Nov 2024 15:44:46 +0000 Subject: [PATCH 2/8] Added explicit pragma into filter sql --- ydb/core/fq/libs/row_dispatcher/json_filter.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 66ea7b2f736e..c75428ef9796 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -293,6 +293,7 @@ class TJsonFilter::TImpl { private: TString GenerateSql(const TString& whereFilter) { TStringStream str; + str << "PRAGMA config.flags(\"LLVM\", \"ON\");"; str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n"; str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName; From eb10f2da2b8682f9160147967179effdde056c5a Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 12 Nov 2024 08:07:34 +0000 Subject: [PATCH 3/8] Passed enabled LLVM setting from query to RD --- .../fq/libs/row_dispatcher/actors_factory.cpp | 4 +- .../fq/libs/row_dispatcher/actors_factory.h | 5 ++- ydb/core/fq/libs/row_dispatcher/common.cpp | 37 +++++++++++++++++++ ydb/core/fq/libs/row_dispatcher/common.h | 25 +++++++++++++ .../fq/libs/row_dispatcher/row_dispatcher.cpp | 8 ++-- .../fq/libs/row_dispatcher/topic_session.cpp | 17 +++++---- .../fq/libs/row_dispatcher/topic_session.h | 5 ++- .../row_dispatcher/ut/row_dispatcher_ut.cpp | 2 +- .../row_dispatcher/ut/topic_session_ut.cpp | 5 +-- ydb/core/fq/libs/row_dispatcher/ya.make | 1 + .../yql/providers/pq/proto/dq_io.proto | 1 + .../pq/provider/yql_pq_dq_integration.cpp | 7 ++++ 12 files changed, 94 insertions(+), 23 deletions(-) create mode 100644 ydb/core/fq/libs/row_dispatcher/common.cpp create mode 100644 ydb/core/fq/libs/row_dispatcher/common.h diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp index f78e5f691a84..9508f57729b4 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp @@ -2,8 +2,6 @@ #include -#include - namespace NFq::NRowDispatcher { @@ -19,7 +17,7 @@ struct TActorFactory : public IActorFactory { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) const override { diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.h b/ydb/core/fq/libs/row_dispatcher/actors_factory.h index ce3d8be007c5..95c4f65fd633 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.h +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.h @@ -1,11 +1,12 @@ #pragma once +#include "common.h" + #include #include #include #include #include -#include namespace NFq::NRowDispatcher { @@ -21,7 +22,7 @@ struct IActorFactory : public TThrRefBase { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) const = 0; }; diff --git a/ydb/core/fq/libs/row_dispatcher/common.cpp b/ydb/core/fq/libs/row_dispatcher/common.cpp new file mode 100644 index 000000000000..9452c77db59b --- /dev/null +++ b/ydb/core/fq/libs/row_dispatcher/common.cpp @@ -0,0 +1,37 @@ +#include "common.h" + +#include + +namespace NFq { + +namespace { + +class TPureCalcProgramFactory : public IPureCalcProgramFactory { +public: + NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) override { + const auto it = ProgramFactories.find(settings); + if (it != ProgramFactories.end()) { + return it->second; + } + return CreateFactory(settings); + } + +private: + NYql::NPureCalc::IProgramFactoryPtr CreateFactory(const TSettings& settings) { + return ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory( + NYql::NPureCalc::TProgramFactoryOptions() + .SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF") + )}).first->second; + } + +private: + std::map ProgramFactories; +}; + +} // anonymous namespace + +IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() { + return MakeIntrusive(); +} + +} // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/common.h b/ydb/core/fq/libs/row_dispatcher/common.h new file mode 100644 index 000000000000..8e33787b2767 --- /dev/null +++ b/ydb/core/fq/libs/row_dispatcher/common.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#include + +namespace NFq { + +class IPureCalcProgramFactory : public TThrRefBase { +public: + using TPtr = TIntrusivePtr; + + struct TSettings { + bool EnabledLLVM = false; + + std::strong_ordering operator<=>(const TSettings& other) const = default; + }; + +public: + virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) = 0; +}; + +IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory(); + +} // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index f2944ffe1600..f2a4a2d2ab9f 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -1,4 +1,5 @@ #include "row_dispatcher.h" +#include "common.h" #include "coordinator.h" #include @@ -214,7 +215,7 @@ class TRowDispatcher : public TActorBootstrapped { NConfig::TRowDispatcherConfig Config; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; TYqSharedResources::TPtr YqSharedResources; TMaybe CoordinatorActorId; TSet CoordinatorChangedSubscribers; @@ -362,10 +363,7 @@ TRowDispatcher::TRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway) : Config(config) , CredentialsProviderFactory(credentialsProviderFactory) - , PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory( - NYql::NPureCalc::TProgramFactoryOptions() - .SetLLVMSettings("ON") - )) + , PureCalcProgramFactory(CreatePureCalcProgramFactory()) , YqSharedResources(yqSharedResources) , CredentialsFactory(credentialsFactory) , LogPrefix("RowDispatcher: ") diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index f6acf333eee2..53bb83fe7a8d 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -158,7 +158,7 @@ class TTopicSession : public TActorBootstrapped { ui32 PartitionId; NYdb::TDriver Driver; std::shared_ptr CredentialsProviderFactory; - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; NYql::ITopicClient::TPtr TopicClient; std::shared_ptr ReadSession; const i64 BufferSize; @@ -190,7 +190,7 @@ class TTopicSession : public TActorBootstrapped { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway); @@ -281,7 +281,7 @@ TTopicSession::TTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) : TopicPath(topicPath) @@ -737,10 +737,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { std::forward_as_tuple(ev)).first->second; UpdateFieldsIds(clientInfo); - TString predicate = clientInfo.Settings.GetSource().GetPredicate(); + const auto& source = clientInfo.Settings.GetSource(); + TString predicate = source.GetPredicate(); // TODO: remove this when the re-parsing is removed from pq read actor - if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) { + if (predicate.empty() && HasJsonColumns(source)) { predicate = "WHERE TRUE"; } @@ -752,7 +753,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { [&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){ Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId)); }, - PureCalcProgramFactory); + PureCalcProgramFactory->GetFactory({ + .EnabledLLVM = source.GetEnabledLLVM() + })); } else { ClientsWithoutPredicate.insert(ev->Sender); } @@ -993,7 +996,7 @@ std::unique_ptr NewTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) { return std::unique_ptr(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway)); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.h b/ydb/core/fq/libs/row_dispatcher/topic_session.h index 54f8b3510b11..77d49168db84 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.h +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.h @@ -1,5 +1,7 @@ #pragma once +#include "common.h" + #include #include #include @@ -8,7 +10,6 @@ #include #include -#include #include @@ -25,7 +26,7 @@ std::unique_ptr NewTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index bafd824fb48b..cc2d2407b8a1 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -35,7 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory { ui32 /*partitionId*/, NYdb::TDriver /*driver*/, std::shared_ptr /*credentialsProviderFactory*/, - NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/, + IPureCalcProgramFactory::TPtr /*pureCalcProgramFactory*/, const ::NMonitoring::TDynamicCounterPtr& /*counters*/, const NYql::IPqGateway::TPtr& /*pqGateway*/) const override { auto actorId = Runtime.AllocateEdgeActor(); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 37d757d88673..085f9f8b9dfe 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -13,7 +13,6 @@ #include #include -#include namespace { @@ -27,7 +26,7 @@ const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec; class TFixture : public NUnitTest::TBaseFixture { public: TFixture() - : PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + : PureCalcProgramFactory(CreatePureCalcProgramFactory()) , Runtime(true) {} @@ -158,7 +157,7 @@ class TFixture : public NUnitTest::TBaseFixture { return eventHolder->Get()->Record.MessagesSize(); } - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; NActors::TTestActorRuntime Runtime; TActorSystemStub ActorSystemStub; NActors::TActorId TopicSession; diff --git a/ydb/core/fq/libs/row_dispatcher/ya.make b/ydb/core/fq/libs/row_dispatcher/ya.make index 45eb3c9e0fdf..206974985683 100644 --- a/ydb/core/fq/libs/row_dispatcher/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( actors_factory.cpp + common.cpp coordinator.cpp json_filter.cpp json_parser.cpp diff --git a/ydb/library/yql/providers/pq/proto/dq_io.proto b/ydb/library/yql/providers/pq/proto/dq_io.proto index 565563647a82..457c244124f5 100644 --- a/ydb/library/yql/providers/pq/proto/dq_io.proto +++ b/ydb/library/yql/providers/pq/proto/dq_io.proto @@ -38,6 +38,7 @@ message TDqPqTopicSource { string Predicate = 14; bool SharedReading = 15; string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m + bool EnabledLLVM = 17; } message TDqPqTopicSink { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index e1a304ea98b1..9cd28bb424a6 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -209,6 +209,13 @@ class TPqDqIntegration: public TDqIntegrationBase { srcDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); srcDesc.SetDatabaseId(clusterDesc->DatabaseId); + srcDesc.SetEnabledLLVM(false); + if (const auto& types = State_->Types) { + if (const auto& optLLVM = types->OptLLVM) { + srcDesc.SetEnabledLLVM(!optLLVM->Empty() && *optLLVM != "OFF"); + } + } + bool sharedReading = false; TString format; size_t const settingsCount = topicSource.Settings().Size(); From 43640936c1fae54b17abfb1f7c603633af2d7174 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 12 Nov 2024 08:30:28 +0000 Subject: [PATCH 4/8] Removed pragma from filter --- ydb/core/fq/libs/row_dispatcher/json_filter.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index c75428ef9796..66ea7b2f736e 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -293,7 +293,6 @@ class TJsonFilter::TImpl { private: TString GenerateSql(const TString& whereFilter) { TStringStream str; - str << "PRAGMA config.flags(\"LLVM\", \"ON\");"; str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n"; str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName; From a387c73eff4f0e76eab9b26d06b91fe9a673f332 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 12 Nov 2024 08:42:16 +0000 Subject: [PATCH 5/8] Added pragma in filter sql --- .../fq/libs/row_dispatcher/json_filter.cpp | 20 +++++++++++-------- ydb/core/fq/libs/row_dispatcher/json_filter.h | 9 ++++++--- .../fq/libs/row_dispatcher/topic_session.cpp | 6 +++--- .../libs/row_dispatcher/ut/json_filter_ut.cpp | 9 +++++---- 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 66ea7b2f736e..c510094be3a9 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -264,14 +264,15 @@ class TJsonFilter::TImpl { const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) - : Sql(GenerateSql(whereFilter)) { + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings) + : Sql(GenerateSql(whereFilter, factorySettings)) { Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); // Program should be stateless because input values // allocated on another allocator and should be released LOG_ROW_DISPATCHER_DEBUG("Creating program..."); - Program = pureCalcProgramFactory->MakePushStreamProgram( + Program = pureCalcProgramFactory->GetFactory(factorySettings)->MakePushStreamProgram( TFilterInputSpec(MakeInputSchema(columns, types)), TFilterOutputSpec(MakeOutputSchema()), Sql, @@ -291,8 +292,9 @@ class TJsonFilter::TImpl { } private: - TString GenerateSql(const TString& whereFilter) { + TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) { TStringStream str; + str << "PRAGMA config.flags(\"LLVM\", \"" << (factorySettings.EnabledLLVM ? "ON" : "OFF") << "\");\n"; str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n"; str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName; @@ -312,8 +314,9 @@ TJsonFilter::TJsonFilter( const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) - : Impl(std::make_unique(columns, types, whereFilter, callback, pureCalcProgramFactory)) { + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings) + : Impl(std::make_unique(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)) { } TJsonFilter::~TJsonFilter() { @@ -332,8 +335,9 @@ std::unique_ptr NewJsonFilter( const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) { - return std::unique_ptr(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory)); + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings) { + return std::unique_ptr(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.h b/ydb/core/fq/libs/row_dispatcher/json_filter.h index 183f23eebba0..6d1cebf9338c 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.h @@ -1,7 +1,8 @@ #pragma once +#include "common.h" + #include -#include #include #include @@ -17,7 +18,8 @@ class TJsonFilter { const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory); + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings); ~TJsonFilter(); @@ -34,6 +36,7 @@ std::unique_ptr NewJsonFilter( const TVector& types, const TString& whereFilter, TJsonFilter::TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory); + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 53bb83fe7a8d..841ed7cf2d31 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -753,9 +753,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { [&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){ Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId)); }, - PureCalcProgramFactory->GetFactory({ - .EnabledLLVM = source.GetEnabledLLVM() - })); + PureCalcProgramFactory, + {.EnabledLLVM = source.GetEnabledLLVM()} + ); } else { ClientsWithoutPredicate.insert(ev->Sender); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index d87ffefe6b92..8f975e004463 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -10,7 +11,6 @@ #include #include -#include #include @@ -23,7 +23,7 @@ class TFixture : public NUnitTest::TBaseFixture { public: TFixture() - : PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + : PureCalcProgramFactory(CreatePureCalcProgramFactory()) , Runtime(true) , Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) {} @@ -65,7 +65,8 @@ class TFixture : public NUnitTest::TBaseFixture { types, whereFilter, callback, - PureCalcProgramFactory); + PureCalcProgramFactory, + {.EnabledLLVM = false}); } const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function valueCreator) { @@ -100,7 +101,7 @@ class TFixture : public NUnitTest::TBaseFixture { }); } - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; NActors::TTestActorRuntime Runtime; TActorSystemStub ActorSystemStub; std::unique_ptr Filter; From cf2b3769e85021b5fe4972a40f903288cbfef5c7 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 12 Nov 2024 09:12:42 +0000 Subject: [PATCH 6/8] Added mutex into shared context --- ydb/core/fq/libs/row_dispatcher/common.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/common.cpp b/ydb/core/fq/libs/row_dispatcher/common.cpp index 9452c77db59b..24690f07cfab 100644 --- a/ydb/core/fq/libs/row_dispatcher/common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/common.cpp @@ -1,5 +1,7 @@ #include "common.h" +#include + #include namespace NFq { @@ -9,15 +11,13 @@ namespace { class TPureCalcProgramFactory : public IPureCalcProgramFactory { public: NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) override { + TGuard guard(FactoriesMutex); + const auto it = ProgramFactories.find(settings); if (it != ProgramFactories.end()) { return it->second; } - return CreateFactory(settings); - } -private: - NYql::NPureCalc::IProgramFactoryPtr CreateFactory(const TSettings& settings) { return ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory( NYql::NPureCalc::TProgramFactoryOptions() .SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF") @@ -25,6 +25,7 @@ class TPureCalcProgramFactory : public IPureCalcProgramFactory { } private: + TMutex FactoriesMutex; std::map ProgramFactories; }; From 04543d690fb677a0eb3a9bc5d431a26e4a438e23 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 13 Nov 2024 12:46:07 +0000 Subject: [PATCH 7/8] Fixed issues --- ydb/core/fq/libs/row_dispatcher/common.cpp | 20 +++++++++++-------- ydb/core/fq/libs/row_dispatcher/common.h | 2 +- .../pq/provider/yql_pq_dq_integration.cpp | 1 - 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/common.cpp b/ydb/core/fq/libs/row_dispatcher/common.cpp index 24690f07cfab..50bd7423cb8f 100644 --- a/ydb/core/fq/libs/row_dispatcher/common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/common.cpp @@ -10,22 +10,26 @@ namespace { class TPureCalcProgramFactory : public IPureCalcProgramFactory { public: - NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) override { - TGuard guard(FactoriesMutex); + TPureCalcProgramFactory() { + CreateFactory({.EnabledLLVM = false}); + CreateFactory({.EnabledLLVM = true}); + } + NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override { const auto it = ProgramFactories.find(settings); - if (it != ProgramFactories.end()) { - return it->second; - } + Y_ENSURE(it != ProgramFactories.end()); + return it->second; + } - return ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory( +private: + void CreateFactory(const TSettings& settings) { + ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory( NYql::NPureCalc::TProgramFactoryOptions() .SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF") - )}).first->second; + )}); } private: - TMutex FactoriesMutex; std::map ProgramFactories; }; diff --git a/ydb/core/fq/libs/row_dispatcher/common.h b/ydb/core/fq/libs/row_dispatcher/common.h index 8e33787b2767..50f43443b915 100644 --- a/ydb/core/fq/libs/row_dispatcher/common.h +++ b/ydb/core/fq/libs/row_dispatcher/common.h @@ -17,7 +17,7 @@ class IPureCalcProgramFactory : public TThrRefBase { }; public: - virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) = 0; + virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0; }; IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory(); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 9cd28bb424a6..08657e1f8a21 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -209,7 +209,6 @@ class TPqDqIntegration: public TDqIntegrationBase { srcDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); srcDesc.SetDatabaseId(clusterDesc->DatabaseId); - srcDesc.SetEnabledLLVM(false); if (const auto& types = State_->Types) { if (const auto& optLLVM = types->OptLLVM) { srcDesc.SetEnabledLLVM(!optLLVM->Empty() && *optLLVM != "OFF"); From 3c5c4e63ab433dc7f90eff48b99f04a463c5f5ee Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 13 Nov 2024 13:43:47 +0000 Subject: [PATCH 8/8] Fixed build error --- ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 08657e1f8a21..cfa5da4d243e 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -211,7 +211,7 @@ class TPqDqIntegration: public TDqIntegrationBase { if (const auto& types = State_->Types) { if (const auto& optLLVM = types->OptLLVM) { - srcDesc.SetEnabledLLVM(!optLLVM->Empty() && *optLLVM != "OFF"); + srcDesc.SetEnabledLLVM(!optLLVM->empty() && *optLLVM != "OFF"); } }