Skip to content

Commit a387c73

Browse files
committed
Added pragma in filter sql
1 parent 4364093 commit a387c73

File tree

4 files changed

+26
-18
lines changed

4 files changed

+26
-18
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -264,14 +264,15 @@ class TJsonFilter::TImpl {
264264
const TVector<TString>& types,
265265
const TString& whereFilter,
266266
TCallback callback,
267-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
268-
: Sql(GenerateSql(whereFilter)) {
267+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
268+
const IPureCalcProgramFactory::TSettings& factorySettings)
269+
: Sql(GenerateSql(whereFilter, factorySettings)) {
269270
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
270271

271272
// Program should be stateless because input values
272273
// allocated on another allocator and should be released
273274
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
274-
Program = pureCalcProgramFactory->MakePushStreamProgram(
275+
Program = pureCalcProgramFactory->GetFactory(factorySettings)->MakePushStreamProgram(
275276
TFilterInputSpec(MakeInputSchema(columns, types)),
276277
TFilterOutputSpec(MakeOutputSchema()),
277278
Sql,
@@ -291,8 +292,9 @@ class TJsonFilter::TImpl {
291292
}
292293

293294
private:
294-
TString GenerateSql(const TString& whereFilter) {
295+
TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) {
295296
TStringStream str;
297+
str << "PRAGMA config.flags(\"LLVM\", \"" << (factorySettings.EnabledLLVM ? "ON" : "OFF") << "\");\n";
296298
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";
297299

298300
str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
@@ -312,8 +314,9 @@ TJsonFilter::TJsonFilter(
312314
const TVector<TString>& types,
313315
const TString& whereFilter,
314316
TCallback callback,
315-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
316-
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory)) {
317+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
318+
const IPureCalcProgramFactory::TSettings& factorySettings)
319+
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)) {
317320
}
318321

319322
TJsonFilter::~TJsonFilter() {
@@ -332,8 +335,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
332335
const TVector<TString>& types,
333336
const TString& whereFilter,
334337
TCallback callback,
335-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) {
336-
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory));
338+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
339+
const IPureCalcProgramFactory::TSettings& factorySettings) {
340+
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings));
337341
}
338342

339343
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/json_filter.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#pragma once
22

3+
#include "common.h"
4+
35
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
4-
#include <ydb/library/yql/public/purecalc/common/fwd.h>
56
#include <yql/essentials/public/udf/udf_data_type.h>
67
#include <yql/essentials/public/udf/udf_value.h>
78

@@ -17,7 +18,8 @@ class TJsonFilter {
1718
const TVector<TString>& types,
1819
const TString& whereFilter,
1920
TCallback callback,
20-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
21+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
22+
const IPureCalcProgramFactory::TSettings& factorySettings);
2123

2224
~TJsonFilter();
2325

@@ -34,6 +36,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
3436
const TVector<TString>& types,
3537
const TString& whereFilter,
3638
TJsonFilter::TCallback callback,
37-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
39+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
40+
const IPureCalcProgramFactory::TSettings& factorySettings);
3841

3942
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -753,9 +753,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
753753
[&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){
754754
Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId));
755755
},
756-
PureCalcProgramFactory->GetFactory({
757-
.EnabledLLVM = source.GetEnabledLLVM()
758-
}));
756+
PureCalcProgramFactory,
757+
{.EnabledLLVM = source.GetEnabledLLVM()}
758+
);
759759
} else {
760760
ClientsWithoutPredicate.insert(ev->Sender);
761761
}

ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
#include <ydb/core/fq/libs/ydb/ydb.h>
44
#include <ydb/core/fq/libs/events/events.h>
55

6+
#include <ydb/core/fq/libs/row_dispatcher/common.h>
67
#include <ydb/core/fq/libs/row_dispatcher/json_filter.h>
78

89
#include <ydb/core/testlib/actors/test_runtime.h>
910
#include <ydb/core/testlib/basics/helpers.h>
1011
#include <ydb/core/testlib/actor_helpers.h>
1112

1213
#include <yql/essentials/minikql/mkql_string_util.h>
13-
#include <ydb/library/yql/public/purecalc/common/interface.h>
1414

1515
#include <library/cpp/testing/unittest/registar.h>
1616

@@ -23,7 +23,7 @@ class TFixture : public NUnitTest::TBaseFixture {
2323

2424
public:
2525
TFixture()
26-
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
26+
: PureCalcProgramFactory(CreatePureCalcProgramFactory())
2727
, Runtime(true)
2828
, Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
2929
{}
@@ -65,7 +65,8 @@ class TFixture : public NUnitTest::TBaseFixture {
6565
types,
6666
whereFilter,
6767
callback,
68-
PureCalcProgramFactory);
68+
PureCalcProgramFactory,
69+
{.EnabledLLVM = false});
6970
}
7071

7172
const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function<NYql::NUdf::TUnboxedValuePod(size_t)> valueCreator) {
@@ -100,7 +101,7 @@ class TFixture : public NUnitTest::TBaseFixture {
100101
});
101102
}
102103

103-
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
104+
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
104105
NActors::TTestActorRuntime Runtime;
105106
TActorSystemStub ActorSystemStub;
106107
std::unique_ptr<NFq::TJsonFilter> Filter;

0 commit comments

Comments
 (0)