diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp index f78e5f691a84..9508f57729b4 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp @@ -2,8 +2,6 @@ #include -#include - namespace NFq::NRowDispatcher { @@ -19,7 +17,7 @@ struct TActorFactory : public IActorFactory { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) const override { diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.h b/ydb/core/fq/libs/row_dispatcher/actors_factory.h index ce3d8be007c5..95c4f65fd633 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.h +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.h @@ -1,11 +1,12 @@ #pragma once +#include "common.h" + #include #include #include #include #include -#include namespace NFq::NRowDispatcher { @@ -21,7 +22,7 @@ struct IActorFactory : public TThrRefBase { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) const = 0; }; diff --git a/ydb/core/fq/libs/row_dispatcher/common.cpp b/ydb/core/fq/libs/row_dispatcher/common.cpp new file mode 100644 index 000000000000..50bd7423cb8f --- /dev/null +++ b/ydb/core/fq/libs/row_dispatcher/common.cpp @@ -0,0 +1,42 @@ +#include "common.h" + +#include + +#include + +namespace NFq { + +namespace { + +class TPureCalcProgramFactory : public IPureCalcProgramFactory { +public: + TPureCalcProgramFactory() { + CreateFactory({.EnabledLLVM = false}); + CreateFactory({.EnabledLLVM = true}); + } + + NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override { + const auto it = ProgramFactories.find(settings); + Y_ENSURE(it != ProgramFactories.end()); + return it->second; + } + +private: + void CreateFactory(const TSettings& settings) { + ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory( + NYql::NPureCalc::TProgramFactoryOptions() + .SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF") + )}); + } + +private: + std::map ProgramFactories; +}; + +} // anonymous namespace + +IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() { + return MakeIntrusive(); +} + +} // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/common.h b/ydb/core/fq/libs/row_dispatcher/common.h new file mode 100644 index 000000000000..50f43443b915 --- /dev/null +++ b/ydb/core/fq/libs/row_dispatcher/common.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#include + +namespace NFq { + +class IPureCalcProgramFactory : public TThrRefBase { +public: + using TPtr = TIntrusivePtr; + + struct TSettings { + bool EnabledLLVM = false; + + std::strong_ordering operator<=>(const TSettings& other) const = default; + }; + +public: + virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0; +}; + +IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory(); + +} // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 66ea7b2f736e..c510094be3a9 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -264,14 +264,15 @@ class TJsonFilter::TImpl { const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) - : Sql(GenerateSql(whereFilter)) { + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings) + : Sql(GenerateSql(whereFilter, factorySettings)) { Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); // Program should be stateless because input values // allocated on another allocator and should be released LOG_ROW_DISPATCHER_DEBUG("Creating program..."); - Program = pureCalcProgramFactory->MakePushStreamProgram( + Program = pureCalcProgramFactory->GetFactory(factorySettings)->MakePushStreamProgram( TFilterInputSpec(MakeInputSchema(columns, types)), TFilterOutputSpec(MakeOutputSchema()), Sql, @@ -291,8 +292,9 @@ class TJsonFilter::TImpl { } private: - TString GenerateSql(const TString& whereFilter) { + TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) { TStringStream str; + str << "PRAGMA config.flags(\"LLVM\", \"" << (factorySettings.EnabledLLVM ? "ON" : "OFF") << "\");\n"; str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n"; str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName; @@ -312,8 +314,9 @@ TJsonFilter::TJsonFilter( const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) - : Impl(std::make_unique(columns, types, whereFilter, callback, pureCalcProgramFactory)) { + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings) + : Impl(std::make_unique(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)) { } TJsonFilter::~TJsonFilter() { @@ -332,8 +335,9 @@ std::unique_ptr NewJsonFilter( const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) { - return std::unique_ptr(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory)); + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings) { + return std::unique_ptr(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.h b/ydb/core/fq/libs/row_dispatcher/json_filter.h index 183f23eebba0..6d1cebf9338c 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.h @@ -1,7 +1,8 @@ #pragma once +#include "common.h" + #include -#include #include #include @@ -17,7 +18,8 @@ class TJsonFilter { const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory); + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings); ~TJsonFilter(); @@ -34,6 +36,7 @@ std::unique_ptr NewJsonFilter( const TVector& types, const TString& whereFilter, TJsonFilter::TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory); + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 3749fae41819..f2a4a2d2ab9f 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -1,4 +1,5 @@ #include "row_dispatcher.h" +#include "common.h" #include "coordinator.h" #include @@ -214,7 +215,7 @@ class TRowDispatcher : public TActorBootstrapped { NConfig::TRowDispatcherConfig Config; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; TYqSharedResources::TPtr YqSharedResources; TMaybe CoordinatorActorId; TSet CoordinatorChangedSubscribers; @@ -362,7 +363,7 @@ TRowDispatcher::TRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway) : Config(config) , CredentialsProviderFactory(credentialsProviderFactory) - , PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + , PureCalcProgramFactory(CreatePureCalcProgramFactory()) , YqSharedResources(yqSharedResources) , CredentialsFactory(credentialsFactory) , LogPrefix("RowDispatcher: ") diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index f6acf333eee2..841ed7cf2d31 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -158,7 +158,7 @@ class TTopicSession : public TActorBootstrapped { ui32 PartitionId; NYdb::TDriver Driver; std::shared_ptr CredentialsProviderFactory; - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; NYql::ITopicClient::TPtr TopicClient; std::shared_ptr ReadSession; const i64 BufferSize; @@ -190,7 +190,7 @@ class TTopicSession : public TActorBootstrapped { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway); @@ -281,7 +281,7 @@ TTopicSession::TTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) : TopicPath(topicPath) @@ -737,10 +737,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { std::forward_as_tuple(ev)).first->second; UpdateFieldsIds(clientInfo); - TString predicate = clientInfo.Settings.GetSource().GetPredicate(); + const auto& source = clientInfo.Settings.GetSource(); + TString predicate = source.GetPredicate(); // TODO: remove this when the re-parsing is removed from pq read actor - if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) { + if (predicate.empty() && HasJsonColumns(source)) { predicate = "WHERE TRUE"; } @@ -752,7 +753,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { [&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){ Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId)); }, - PureCalcProgramFactory); + PureCalcProgramFactory, + {.EnabledLLVM = source.GetEnabledLLVM()} + ); } else { ClientsWithoutPredicate.insert(ev->Sender); } @@ -993,7 +996,7 @@ std::unique_ptr NewTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) { return std::unique_ptr(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway)); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.h b/ydb/core/fq/libs/row_dispatcher/topic_session.h index 54f8b3510b11..77d49168db84 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.h +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.h @@ -1,5 +1,7 @@ #pragma once +#include "common.h" + #include #include #include @@ -8,7 +10,6 @@ #include #include -#include #include @@ -25,7 +26,7 @@ std::unique_ptr NewTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index d87ffefe6b92..8f975e004463 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -10,7 +11,6 @@ #include #include -#include #include @@ -23,7 +23,7 @@ class TFixture : public NUnitTest::TBaseFixture { public: TFixture() - : PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + : PureCalcProgramFactory(CreatePureCalcProgramFactory()) , Runtime(true) , Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) {} @@ -65,7 +65,8 @@ class TFixture : public NUnitTest::TBaseFixture { types, whereFilter, callback, - PureCalcProgramFactory); + PureCalcProgramFactory, + {.EnabledLLVM = false}); } const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function valueCreator) { @@ -100,7 +101,7 @@ class TFixture : public NUnitTest::TBaseFixture { }); } - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; NActors::TTestActorRuntime Runtime; TActorSystemStub ActorSystemStub; std::unique_ptr Filter; diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index bafd824fb48b..cc2d2407b8a1 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -35,7 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory { ui32 /*partitionId*/, NYdb::TDriver /*driver*/, std::shared_ptr /*credentialsProviderFactory*/, - NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/, + IPureCalcProgramFactory::TPtr /*pureCalcProgramFactory*/, const ::NMonitoring::TDynamicCounterPtr& /*counters*/, const NYql::IPqGateway::TPtr& /*pqGateway*/) const override { auto actorId = Runtime.AllocateEdgeActor(); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 37d757d88673..085f9f8b9dfe 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -13,7 +13,6 @@ #include #include -#include namespace { @@ -27,7 +26,7 @@ const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec; class TFixture : public NUnitTest::TBaseFixture { public: TFixture() - : PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + : PureCalcProgramFactory(CreatePureCalcProgramFactory()) , Runtime(true) {} @@ -158,7 +157,7 @@ class TFixture : public NUnitTest::TBaseFixture { return eventHolder->Get()->Record.MessagesSize(); } - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; NActors::TTestActorRuntime Runtime; TActorSystemStub ActorSystemStub; NActors::TActorId TopicSession; diff --git a/ydb/core/fq/libs/row_dispatcher/ya.make b/ydb/core/fq/libs/row_dispatcher/ya.make index 45eb3c9e0fdf..206974985683 100644 --- a/ydb/core/fq/libs/row_dispatcher/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( actors_factory.cpp + common.cpp coordinator.cpp json_filter.cpp json_parser.cpp diff --git a/ydb/library/yql/providers/pq/proto/dq_io.proto b/ydb/library/yql/providers/pq/proto/dq_io.proto index 565563647a82..457c244124f5 100644 --- a/ydb/library/yql/providers/pq/proto/dq_io.proto +++ b/ydb/library/yql/providers/pq/proto/dq_io.proto @@ -38,6 +38,7 @@ message TDqPqTopicSource { string Predicate = 14; bool SharedReading = 15; string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m + bool EnabledLLVM = 17; } message TDqPqTopicSink { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index e1a304ea98b1..cfa5da4d243e 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -209,6 +209,12 @@ class TPqDqIntegration: public TDqIntegrationBase { srcDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); srcDesc.SetDatabaseId(clusterDesc->DatabaseId); + if (const auto& types = State_->Types) { + if (const auto& optLLVM = types->OptLLVM) { + srcDesc.SetEnabledLLVM(!optLLVM->empty() && *optLLVM != "OFF"); + } + } + bool sharedReading = false; TString format; size_t const settingsCount = topicSource.Settings().Size();