Skip to content

Commit eb10f2d

Browse files
committed
Passed enabled LLVM setting from query to RD
1 parent bfd3027 commit eb10f2d

File tree

12 files changed

+94
-23
lines changed

12 files changed

+94
-23
lines changed

ydb/core/fq/libs/row_dispatcher/actors_factory.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>
44

5-
#include <ydb/library/yql/public/purecalc/common/interface.h>
6-
75
namespace NFq::NRowDispatcher {
86

97

@@ -19,7 +17,7 @@ struct TActorFactory : public IActorFactory {
1917
ui32 partitionId,
2018
NYdb::TDriver driver,
2119
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
22-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
20+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
2321
const ::NMonitoring::TDynamicCounterPtr& counters,
2422
const NYql::IPqGateway::TPtr& pqGateway) const override {
2523

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
#pragma once
22

3+
#include "common.h"
4+
35
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
46
#include <util/generic/ptr.h>
57
#include <ydb/library/actors/core/actor.h>
68
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
79
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
8-
#include <ydb/library/yql/public/purecalc/common/fwd.h>
910

1011
namespace NFq::NRowDispatcher {
1112

@@ -21,7 +22,7 @@ struct IActorFactory : public TThrRefBase {
2122
ui32 partitionId,
2223
NYdb::TDriver driver,
2324
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
24-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
25+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
2526
const ::NMonitoring::TDynamicCounterPtr& counters,
2627
const NYql::IPqGateway::TPtr& pqGateway) const = 0;
2728
};
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#include "common.h"
2+
3+
#include <ydb/library/yql/public/purecalc/common/interface.h>
4+
5+
namespace NFq {
6+
7+
namespace {
8+
9+
class TPureCalcProgramFactory : public IPureCalcProgramFactory {
10+
public:
11+
NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) override {
12+
const auto it = ProgramFactories.find(settings);
13+
if (it != ProgramFactories.end()) {
14+
return it->second;
15+
}
16+
return CreateFactory(settings);
17+
}
18+
19+
private:
20+
NYql::NPureCalc::IProgramFactoryPtr CreateFactory(const TSettings& settings) {
21+
return ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
22+
NYql::NPureCalc::TProgramFactoryOptions()
23+
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
24+
)}).first->second;
25+
}
26+
27+
private:
28+
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
29+
};
30+
31+
} // anonymous namespace
32+
33+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
34+
return MakeIntrusive<TPureCalcProgramFactory>();
35+
}
36+
37+
} // namespace NFq
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <util/generic/ptr.h>
4+
5+
#include <ydb/library/yql/public/purecalc/common/fwd.h>
6+
7+
namespace NFq {
8+
9+
class IPureCalcProgramFactory : public TThrRefBase {
10+
public:
11+
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;
12+
13+
struct TSettings {
14+
bool EnabledLLVM = false;
15+
16+
std::strong_ordering operator<=>(const TSettings& other) const = default;
17+
};
18+
19+
public:
20+
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) = 0;
21+
};
22+
23+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();
24+
25+
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "row_dispatcher.h"
2+
#include "common.h"
23
#include "coordinator.h"
34

45
#include <ydb/library/actors/core/actorid.h>
@@ -214,7 +215,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
214215

215216
NConfig::TRowDispatcherConfig Config;
216217
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
217-
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
218+
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
218219
TYqSharedResources::TPtr YqSharedResources;
219220
TMaybe<TActorId> CoordinatorActorId;
220221
TSet<TActorId> CoordinatorChangedSubscribers;
@@ -362,10 +363,7 @@ TRowDispatcher::TRowDispatcher(
362363
const NYql::IPqGateway::TPtr& pqGateway)
363364
: Config(config)
364365
, CredentialsProviderFactory(credentialsProviderFactory)
365-
, PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(
366-
NYql::NPureCalc::TProgramFactoryOptions()
367-
.SetLLVMSettings("ON")
368-
))
366+
, PureCalcProgramFactory(CreatePureCalcProgramFactory())
369367
, YqSharedResources(yqSharedResources)
370368
, CredentialsFactory(credentialsFactory)
371369
, LogPrefix("RowDispatcher: ")

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
158158
ui32 PartitionId;
159159
NYdb::TDriver Driver;
160160
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
161-
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
161+
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
162162
NYql::ITopicClient::TPtr TopicClient;
163163
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
164164
const i64 BufferSize;
@@ -190,7 +190,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
190190
ui32 partitionId,
191191
NYdb::TDriver driver,
192192
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
193-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
193+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
194194
const ::NMonitoring::TDynamicCounterPtr& counters,
195195
const NYql::IPqGateway::TPtr& pqGateway);
196196

@@ -281,7 +281,7 @@ TTopicSession::TTopicSession(
281281
ui32 partitionId,
282282
NYdb::TDriver driver,
283283
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
284-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
284+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
285285
const ::NMonitoring::TDynamicCounterPtr& counters,
286286
const NYql::IPqGateway::TPtr& pqGateway)
287287
: TopicPath(topicPath)
@@ -737,10 +737,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
737737
std::forward_as_tuple(ev)).first->second;
738738
UpdateFieldsIds(clientInfo);
739739

740-
TString predicate = clientInfo.Settings.GetSource().GetPredicate();
740+
const auto& source = clientInfo.Settings.GetSource();
741+
TString predicate = source.GetPredicate();
741742

742743
// TODO: remove this when the re-parsing is removed from pq read actor
743-
if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) {
744+
if (predicate.empty() && HasJsonColumns(source)) {
744745
predicate = "WHERE TRUE";
745746
}
746747

@@ -752,7 +753,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
752753
[&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){
753754
Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId));
754755
},
755-
PureCalcProgramFactory);
756+
PureCalcProgramFactory->GetFactory({
757+
.EnabledLLVM = source.GetEnabledLLVM()
758+
}));
756759
} else {
757760
ClientsWithoutPredicate.insert(ev->Sender);
758761
}
@@ -993,7 +996,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
993996
ui32 partitionId,
994997
NYdb::TDriver driver,
995998
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
996-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
999+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
9971000
const ::NMonitoring::TDynamicCounterPtr& counters,
9981001
const NYql::IPqGateway::TPtr& pqGateway) {
9991002
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway));

ydb/core/fq/libs/row_dispatcher/topic_session.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include "common.h"
4+
35
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
46
#include <ydb/core/fq/libs/config/protos/common.pb.h>
57
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
@@ -8,7 +10,6 @@
810

911
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
1012
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
11-
#include <ydb/library/yql/public/purecalc/common/fwd.h>
1213

1314
#include <ydb/library/actors/core/actor.h>
1415

@@ -25,7 +26,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
2526
ui32 partitionId,
2627
NYdb::TDriver driver,
2728
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
28-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
29+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
2930
const ::NMonitoring::TDynamicCounterPtr& counters,
3031
const NYql::IPqGateway::TPtr& pqGateway);
3132

ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory {
3535
ui32 /*partitionId*/,
3636
NYdb::TDriver /*driver*/,
3737
std::shared_ptr<NYdb::ICredentialsProviderFactory> /*credentialsProviderFactory*/,
38-
NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/,
38+
IPureCalcProgramFactory::TPtr /*pureCalcProgramFactory*/,
3939
const ::NMonitoring::TDynamicCounterPtr& /*counters*/,
4040
const NYql::IPqGateway::TPtr& /*pqGateway*/) const override {
4141
auto actorId = Runtime.AllocateEdgeActor();

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include <ydb/tests/fq/pq_async_io/ut_helpers.h>
1414

1515
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
16-
#include <ydb/library/yql/public/purecalc/common/interface.h>
1716

1817
namespace {
1918

@@ -27,7 +26,7 @@ const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
2726
class TFixture : public NUnitTest::TBaseFixture {
2827
public:
2928
TFixture()
30-
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
29+
: PureCalcProgramFactory(CreatePureCalcProgramFactory())
3130
, Runtime(true)
3231
{}
3332

@@ -158,7 +157,7 @@ class TFixture : public NUnitTest::TBaseFixture {
158157
return eventHolder->Get()->Record.MessagesSize();
159158
}
160159

161-
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
160+
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
162161
NActors::TTestActorRuntime Runtime;
163162
TActorSystemStub ActorSystemStub;
164163
NActors::TActorId TopicSession;

ydb/core/fq/libs/row_dispatcher/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
actors_factory.cpp
5+
common.cpp
56
coordinator.cpp
67
json_filter.cpp
78
json_parser.cpp

0 commit comments

Comments
 (0)