From b6ac10ccb9700d2dfd43cab74f4dbacbff4802ad Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 13 Nov 2024 11:06:43 +0300 Subject: [PATCH 1/3] YQ-3841 RD add column types validation (#11487) --- .../fq/libs/row_dispatcher/topic_session.cpp | 38 ++++++++++++++----- .../row_dispatcher/ut/topic_session_ut.cpp | 22 +++++++++++ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index c72b6bcb32c3..c4082a302ffe 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -142,6 +142,11 @@ class TTopicSession : public TActorBootstrapped { TParserInputType InputType; }; + struct TFieldDescription { + ui64 IndexInParserSchema = 0; + TString Type; + }; + bool InflightReconnect = false; TDuration ReconnectPeriod; const TString TopicPath; @@ -168,7 +173,7 @@ class TTopicSession : public TActorBootstrapped { const ::NMonitoring::TDynamicCounterPtr Counters; TTopicSessionMetrics Metrics; TParserSchema ParserSchema; - THashMap FieldsIndexes; + THashMap FieldsIndexes; NYql::IPqGateway::TPtr PqGateway; TMaybe ConsumerName; @@ -683,14 +688,16 @@ void TTopicSession::SendData(TClientsInfo& info) { } void TTopicSession::UpdateFieldsIds(TClientsInfo& info) { - for (auto name : info.Settings.GetSource().GetColumns()) { + const auto& source = info.Settings.GetSource(); + for (size_t i = 0; i < source.ColumnsSize(); ++i) { + const auto& name = source.GetColumns().Get(i); auto it = FieldsIndexes.find(name); if (it == FieldsIndexes.end()) { auto nextIndex = FieldsIndexes.size(); info.FieldsIds.push_back(nextIndex); - FieldsIndexes[name] = nextIndex; + FieldsIndexes[name] = {nextIndex, source.GetColumnTypes().Get(i)}; } else { - info.FieldsIds.push_back(it->second); + info.FieldsIds.push_back(it->second.IndexInParserSchema); } } } @@ -816,7 +823,7 @@ void TTopicSession::UpdateParserSchema(const TParserInputType& inputType) { ui64 offset = 0; for (const auto& [name, type]: inputType) { Y_ENSURE(FieldsIndexes.contains(name)); - ui64 index = FieldsIndexes[name]; + ui64 index = FieldsIndexes[name].IndexInParserSchema; ParserSchema.FieldsMap[index] = offset++; } ParserSchema.InputType = inputType; @@ -944,13 +951,26 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& SendSessionError(ev->Sender, "Internal error: such a client already exists"); return false; } - if (!Config.GetWithoutConsumer() - && ConsumerName - && ConsumerName != ev->Get()->Record.GetSource().GetConsumerName()) { - LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << ev->Get()->Record.GetSource().GetConsumerName() << ", send error"); + + const auto& source = ev->Get()->Record.GetSource(); + if (!Config.GetWithoutConsumer() && ConsumerName && ConsumerName != source.GetConsumerName()) { + LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << source.GetConsumerName() << ", send error"); SendSessionError(ev->Sender, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")"); return false; } + + Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize()); + for (size_t i = 0; i < source.ColumnsSize(); ++i) { + const auto& name = source.GetColumns().Get(i); + const auto& type = source.GetColumnTypes().Get(i); + const auto it = FieldsIndexes.find(name); + if (it != FieldsIndexes.end() && it->second.Type != type) { + LOG_ROW_DISPATCHER_INFO("Different column `" << name << "` type, expected " << it->second.Type << ", actual " << type << ", send error"); + SendSessionError(ev->Sender, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << name << "` is " << it->second.Type << " (requested type is " << type <<")"); + return false; + } + } + return true; } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index cd1e60932716..37d757d88673 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -435,6 +435,28 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId1, source1); StopSession(ReadActorId2, source2); } + + Y_UNIT_TEST_F(TwoSessionsWithDifferentColumnTypes, TFixture) { + const TString topicName = "dif_types"; + PQCreateStream(topicName); + Init(topicName); + + auto source1 = BuildSource(topicName); + source1.AddColumns("field1"); + source1.AddColumnTypes("[OptionalType; [DataType; String]]"); + StartSession(ReadActorId1, source1); + + TString json1 = "{\"dt\":101,\"field1\":null,\"value\":\"value1\"}"; + PQWrite({ json1 }, topicName); + ExpectNewDataArrived({ReadActorId1}); + ExpectMessageBatch(ReadActorId1, { json1 }); + + auto source2 = BuildSource(topicName); + source2.AddColumns("field1"); + source2.AddColumnTypes("[DataType; String]"); + StartSession(ReadActorId2, source2); + ExpectSessionError(ReadActorId2, "Use the same column type in all queries via RD, current type for column `field1` is [OptionalType; [DataType; String]] (requested type is [DataType; String])"); + } } } From 5fe3c6f0c1b2724bbaf06e2e006247ef0bd946dc Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 13 Nov 2024 10:52:59 +0300 Subject: [PATCH 2/3] YQ-3848 RD support mod pushdown (#11543) --- ydb/library/yql/providers/common/pushdown/collection.cpp | 3 +++ ydb/library/yql/providers/common/pushdown/settings.h | 3 ++- .../generic/connector/api/service/protos/connector.proto | 3 ++- .../generic/provider/yql_generic_predicate_pushdown.cpp | 8 ++++++++ .../yql/providers/pq/provider/yql_pq_logical_opt.cpp | 2 +- ydb/tests/fq/yds/test_row_dispatcher.py | 2 ++ 6 files changed, 18 insertions(+), 3 deletions(-) diff --git a/ydb/library/yql/providers/common/pushdown/collection.cpp b/ydb/library/yql/providers/common/pushdown/collection.cpp index d18b8ae2edea..79bb8b6fe97c 100644 --- a/ydb/library/yql/providers/common/pushdown/collection.cpp +++ b/ydb/library/yql/providers/common/pushdown/collection.cpp @@ -360,6 +360,9 @@ bool CheckExpressionNodeForPushdown(const TExprBase& node, const TExprNode* lamb } else if (const auto op = node.Maybe(); op && settings.IsEnabled(TSettings::EFeatureFlag::UnaryOperators)) { return CheckExpressionNodeForPushdown(op.Cast().Arg(), lambdaArg, settings); } else if (const auto op = node.Maybe(); op && settings.IsEnabled(TSettings::EFeatureFlag::ArithmeticalExpressions)) { + if (!settings.IsEnabled(TSettings::EFeatureFlag::DivisionExpressions) && (op.Maybe() || op.Maybe())) { + return false; + } return CheckExpressionNodeForPushdown(op.Cast().Left(), lambdaArg, settings) && CheckExpressionNodeForPushdown(op.Cast().Right(), lambdaArg, settings); } return false; diff --git a/ydb/library/yql/providers/common/pushdown/settings.h b/ydb/library/yql/providers/common/pushdown/settings.h index acae35b517b4..fd3c2dd08038 100644 --- a/ydb/library/yql/providers/common/pushdown/settings.h +++ b/ydb/library/yql/providers/common/pushdown/settings.h @@ -28,6 +28,7 @@ struct TSettings { JustPassthroughOperators = 1 << 18, // if + coalesce + just InOperator = 1 << 19, // IN() IsDistinctOperator = 1 << 20, // IS NOT DISTINCT FROM / IS DISTINCT FROM + DivisionExpressions = 1 << 21, // %, / -- NOTE: division by zero is not handled and also pushdown // Option which enables partial pushdown for sequence of OR // For example next predicate: @@ -35,7 +36,7 @@ struct TSettings { // May be partially pushdowned as: // $A OR $C // In case of unsupported / complicated expressions $B and $D - SplitOrOperator = 1 << 21 + SplitOrOperator = 1 << 22 }; explicit TSettings(NLog::EComponent logComponent) diff --git a/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto b/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto index 4afabef69d1a..35b5fc74cbae 100644 --- a/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto +++ b/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto @@ -327,10 +327,11 @@ message TExpression { MUL = 1; // left_value * right_value ADD = 2; // left_value + right_value SUB = 3; // left_value - right_value + DIV = 7; // left_value / right_value + MOD = 8; // left_value % right_value BIT_AND = 4; // left_value & right_value BIT_OR = 5; // left_value | right_value BIT_XOR = 6; // left_value ^ right_value - // TODO: support `/` and `%` } EOperation operation = 1; TExpression left_value = 2; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp index 2b0d72c27494..d4fc86702f1f 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp @@ -86,6 +86,8 @@ namespace NYql { MATCH_ARITHMETICAL(Sub, SUB); MATCH_ARITHMETICAL(Add, ADD); MATCH_ARITHMETICAL(Mul, MUL); + MATCH_ARITHMETICAL(Div, DIV); + MATCH_ARITHMETICAL(Mod, MOD); if (auto maybeNull = expression.Maybe()) { proto->mutable_null(); @@ -342,6 +344,12 @@ namespace NYql { case TExpression_TArithmeticalExpression::SUB: operation = " - "; break; + case TExpression_TArithmeticalExpression::DIV: + operation = " / "; + break; + case TExpression_TArithmeticalExpression::MOD: + operation = " % "; + break; case TExpression_TArithmeticalExpression::BIT_AND: operation = " & "; break; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp index 818f6647cb13..b333380f4a7a 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp @@ -34,7 +34,7 @@ namespace { // Operator features EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator | - EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators | + EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators | DivisionExpressions | // Split features EFlag::SplitOrOperator diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 0cbbfc2845ed..ad870ee03398 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -350,6 +350,8 @@ def test_filters_optional_field(self, kikimr, client): self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `flag`') filter = 'time * (field2 - field1) != 0' self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` * (`field2` - `field1`)) <> 0') + filter = '(field1 % field2) / 5 = 1' + self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE ((`field1` % `field2`) / 5) = 1') filter = ' event IS NOT DISTINCT FROM "event2"' self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS NOT DISTINCT FROM \\"event2\\"') filter = ' event IS DISTINCT FROM "event1"' From 5fe3ee75dae375b82edda533a8bbf1bf63a32c1d Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 13 Nov 2024 19:12:51 +0300 Subject: [PATCH 3/3] YQ RD enable LLVM in purecalc filters (#11442) --- .../fq/libs/row_dispatcher/actors_factory.cpp | 4 +- .../fq/libs/row_dispatcher/actors_factory.h | 5 ++- ydb/core/fq/libs/row_dispatcher/common.cpp | 42 +++++++++++++++++++ ydb/core/fq/libs/row_dispatcher/common.h | 25 +++++++++++ .../fq/libs/row_dispatcher/json_filter.cpp | 20 +++++---- ydb/core/fq/libs/row_dispatcher/json_filter.h | 9 ++-- .../fq/libs/row_dispatcher/row_dispatcher.cpp | 5 ++- .../fq/libs/row_dispatcher/topic_session.cpp | 17 ++++---- .../fq/libs/row_dispatcher/topic_session.h | 5 ++- .../libs/row_dispatcher/ut/json_filter_ut.cpp | 9 ++-- .../row_dispatcher/ut/row_dispatcher_ut.cpp | 2 +- .../row_dispatcher/ut/topic_session_ut.cpp | 5 +-- ydb/core/fq/libs/row_dispatcher/ya.make | 1 + .../yql/providers/pq/proto/dq_io.proto | 1 + .../pq/provider/yql_pq_dq_integration.cpp | 6 +++ 15 files changed, 121 insertions(+), 35 deletions(-) create mode 100644 ydb/core/fq/libs/row_dispatcher/common.cpp create mode 100644 ydb/core/fq/libs/row_dispatcher/common.h diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp index f78e5f691a84..9508f57729b4 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp @@ -2,8 +2,6 @@ #include -#include - namespace NFq::NRowDispatcher { @@ -19,7 +17,7 @@ struct TActorFactory : public IActorFactory { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) const override { diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.h b/ydb/core/fq/libs/row_dispatcher/actors_factory.h index ce3d8be007c5..95c4f65fd633 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.h +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.h @@ -1,11 +1,12 @@ #pragma once +#include "common.h" + #include #include #include #include #include -#include namespace NFq::NRowDispatcher { @@ -21,7 +22,7 @@ struct IActorFactory : public TThrRefBase { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) const = 0; }; diff --git a/ydb/core/fq/libs/row_dispatcher/common.cpp b/ydb/core/fq/libs/row_dispatcher/common.cpp new file mode 100644 index 000000000000..50bd7423cb8f --- /dev/null +++ b/ydb/core/fq/libs/row_dispatcher/common.cpp @@ -0,0 +1,42 @@ +#include "common.h" + +#include + +#include + +namespace NFq { + +namespace { + +class TPureCalcProgramFactory : public IPureCalcProgramFactory { +public: + TPureCalcProgramFactory() { + CreateFactory({.EnabledLLVM = false}); + CreateFactory({.EnabledLLVM = true}); + } + + NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override { + const auto it = ProgramFactories.find(settings); + Y_ENSURE(it != ProgramFactories.end()); + return it->second; + } + +private: + void CreateFactory(const TSettings& settings) { + ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory( + NYql::NPureCalc::TProgramFactoryOptions() + .SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF") + )}); + } + +private: + std::map ProgramFactories; +}; + +} // anonymous namespace + +IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() { + return MakeIntrusive(); +} + +} // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/common.h b/ydb/core/fq/libs/row_dispatcher/common.h new file mode 100644 index 000000000000..50f43443b915 --- /dev/null +++ b/ydb/core/fq/libs/row_dispatcher/common.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#include + +namespace NFq { + +class IPureCalcProgramFactory : public TThrRefBase { +public: + using TPtr = TIntrusivePtr; + + struct TSettings { + bool EnabledLLVM = false; + + std::strong_ordering operator<=>(const TSettings& other) const = default; + }; + +public: + virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0; +}; + +IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory(); + +} // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 202369744f1d..e88609a98751 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -264,14 +264,15 @@ class TJsonFilter::TImpl { const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) - : Sql(GenerateSql(whereFilter)) { + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings) + : Sql(GenerateSql(whereFilter, factorySettings)) { Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); // Program should be stateless because input values // allocated on another allocator and should be released LOG_ROW_DISPATCHER_DEBUG("Creating program..."); - Program = pureCalcProgramFactory->MakePushStreamProgram( + Program = pureCalcProgramFactory->GetFactory(factorySettings)->MakePushStreamProgram( TFilterInputSpec(MakeInputSchema(columns, types)), TFilterOutputSpec(MakeOutputSchema()), Sql, @@ -291,8 +292,9 @@ class TJsonFilter::TImpl { } private: - TString GenerateSql(const TString& whereFilter) { + TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) { TStringStream str; + str << "PRAGMA config.flags(\"LLVM\", \"" << (factorySettings.EnabledLLVM ? "ON" : "OFF") << "\");\n"; str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n"; str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName; @@ -312,8 +314,9 @@ TJsonFilter::TJsonFilter( const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) - : Impl(std::make_unique(columns, types, whereFilter, callback, pureCalcProgramFactory)) { + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings) + : Impl(std::make_unique(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)) { } TJsonFilter::~TJsonFilter() { @@ -332,8 +335,9 @@ std::unique_ptr NewJsonFilter( const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) { - return std::unique_ptr(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory)); + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings) { + return std::unique_ptr(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.h b/ydb/core/fq/libs/row_dispatcher/json_filter.h index 09401c6a9b86..b47bc984b95f 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.h @@ -1,7 +1,8 @@ #pragma once +#include "common.h" + #include -#include namespace NFq { @@ -15,7 +16,8 @@ class TJsonFilter { const TVector& types, const TString& whereFilter, TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory); + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings); ~TJsonFilter(); @@ -32,6 +34,7 @@ std::unique_ptr NewJsonFilter( const TVector& types, const TString& whereFilter, TJsonFilter::TCallback callback, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory); + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, + const IPureCalcProgramFactory::TSettings& factorySettings); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 90b7f889774e..fdd6ab174bba 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -1,4 +1,5 @@ #include "row_dispatcher.h" +#include "common.h" #include "coordinator.h" #include @@ -214,7 +215,7 @@ class TRowDispatcher : public TActorBootstrapped { NConfig::TRowDispatcherConfig Config; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; TYqSharedResources::TPtr YqSharedResources; TMaybe CoordinatorActorId; TSet CoordinatorChangedSubscribers; @@ -362,7 +363,7 @@ TRowDispatcher::TRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway) : Config(config) , CredentialsProviderFactory(credentialsProviderFactory) - , PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + , PureCalcProgramFactory(CreatePureCalcProgramFactory()) , YqSharedResources(yqSharedResources) , CredentialsFactory(credentialsFactory) , LogPrefix("RowDispatcher: ") diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index c4082a302ffe..69902a11150a 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -156,7 +156,7 @@ class TTopicSession : public TActorBootstrapped { ui32 PartitionId; NYdb::TDriver Driver; std::shared_ptr CredentialsProviderFactory; - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; NYql::ITopicClient::TPtr TopicClient; std::shared_ptr ReadSession; const i64 BufferSize; @@ -187,7 +187,7 @@ class TTopicSession : public TActorBootstrapped { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway); @@ -278,7 +278,7 @@ TTopicSession::TTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) : TopicPath(topicPath) @@ -734,10 +734,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { std::forward_as_tuple(ev)).first->second; UpdateFieldsIds(clientInfo); - TString predicate = clientInfo.Settings.GetSource().GetPredicate(); + const auto& source = clientInfo.Settings.GetSource(); + TString predicate = source.GetPredicate(); // TODO: remove this when the re-parsing is removed from pq read actor - if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) { + if (predicate.empty() && HasJsonColumns(source)) { predicate = "WHERE TRUE"; } @@ -749,7 +750,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { [&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){ Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId)); }, - PureCalcProgramFactory); + PureCalcProgramFactory, + {.EnabledLLVM = source.GetEnabledLLVM()} + ); } else { ClientsWithoutPredicate.insert(ev->Sender); } @@ -987,7 +990,7 @@ std::unique_ptr NewTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) { return std::unique_ptr(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway)); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.h b/ydb/core/fq/libs/row_dispatcher/topic_session.h index 54f8b3510b11..77d49168db84 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.h +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.h @@ -1,5 +1,7 @@ #pragma once +#include "common.h" + #include #include #include @@ -8,7 +10,6 @@ #include #include -#include #include @@ -25,7 +26,7 @@ std::unique_ptr NewTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, - NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, + IPureCalcProgramFactory::TPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index d17ec36cf673..f6977c6ae846 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -10,7 +11,6 @@ #include #include -#include #include @@ -23,7 +23,7 @@ class TFixture : public NUnitTest::TBaseFixture { public: TFixture() - : PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + : PureCalcProgramFactory(CreatePureCalcProgramFactory()) , Runtime(true) , Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) {} @@ -65,7 +65,8 @@ class TFixture : public NUnitTest::TBaseFixture { types, whereFilter, callback, - PureCalcProgramFactory); + PureCalcProgramFactory, + {.EnabledLLVM = false}); } const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function valueCreator) { @@ -100,7 +101,7 @@ class TFixture : public NUnitTest::TBaseFixture { }); } - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; NActors::TTestActorRuntime Runtime; TActorSystemStub ActorSystemStub; std::unique_ptr Filter; diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index bafd824fb48b..cc2d2407b8a1 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -35,7 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory { ui32 /*partitionId*/, NYdb::TDriver /*driver*/, std::shared_ptr /*credentialsProviderFactory*/, - NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/, + IPureCalcProgramFactory::TPtr /*pureCalcProgramFactory*/, const ::NMonitoring::TDynamicCounterPtr& /*counters*/, const NYql::IPqGateway::TPtr& /*pqGateway*/) const override { auto actorId = Runtime.AllocateEdgeActor(); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 37d757d88673..085f9f8b9dfe 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -13,7 +13,6 @@ #include #include -#include namespace { @@ -27,7 +26,7 @@ const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec; class TFixture : public NUnitTest::TBaseFixture { public: TFixture() - : PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + : PureCalcProgramFactory(CreatePureCalcProgramFactory()) , Runtime(true) {} @@ -158,7 +157,7 @@ class TFixture : public NUnitTest::TBaseFixture { return eventHolder->Get()->Record.MessagesSize(); } - NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; + IPureCalcProgramFactory::TPtr PureCalcProgramFactory; NActors::TTestActorRuntime Runtime; TActorSystemStub ActorSystemStub; NActors::TActorId TopicSession; diff --git a/ydb/core/fq/libs/row_dispatcher/ya.make b/ydb/core/fq/libs/row_dispatcher/ya.make index 58572ef517c6..51de45888f6d 100644 --- a/ydb/core/fq/libs/row_dispatcher/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( actors_factory.cpp + common.cpp coordinator.cpp json_filter.cpp json_parser.cpp diff --git a/ydb/library/yql/providers/pq/proto/dq_io.proto b/ydb/library/yql/providers/pq/proto/dq_io.proto index 565563647a82..457c244124f5 100644 --- a/ydb/library/yql/providers/pq/proto/dq_io.proto +++ b/ydb/library/yql/providers/pq/proto/dq_io.proto @@ -38,6 +38,7 @@ message TDqPqTopicSource { string Predicate = 14; bool SharedReading = 15; string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m + bool EnabledLLVM = 17; } message TDqPqTopicSink { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 99a4bd187ee2..1631f3c85493 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -209,6 +209,12 @@ class TPqDqIntegration: public TDqIntegrationBase { srcDesc.SetClusterType(ToClusterType(clusterDesc->ClusterType)); srcDesc.SetDatabaseId(clusterDesc->DatabaseId); + if (const auto& types = State_->Types) { + if (const auto& optLLVM = types->OptLLVM) { + srcDesc.SetEnabledLLVM(!optLLVM->empty() && *optLLVM != "OFF"); + } + } + bool sharedReading = false; TString format; size_t const settingsCount = topicSource.Settings().Size();