Skip to content

Commit c38def0

Browse files
committed
YQ RD optimized purecalc memory usage (ydb-platform#11394)
1 parent 202eb87 commit c38def0

File tree

10 files changed

+58
-19
lines changed

10 files changed

+58
-19
lines changed

ydb/core/fq/libs/row_dispatcher/actors_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>
44

5+
#include <ydb/library/yql/public/purecalc/common/interface.h>
6+
57
namespace NFq::NRowDispatcher {
68

79

@@ -17,6 +19,7 @@ struct TActorFactory : public IActorFactory {
1719
ui32 partitionId,
1820
NYdb::TDriver driver,
1921
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
22+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
2023
const ::NMonitoring::TDynamicCounterPtr& counters,
2124
const NYql::IPqGateway::TPtr& pqGateway) const override {
2225

@@ -29,6 +32,7 @@ struct TActorFactory : public IActorFactory {
2932
partitionId,
3033
std::move(driver),
3134
credentialsProviderFactory,
35+
pureCalcProgramFactory,
3236
counters,
3337
pqGateway
3438
);

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/library/actors/core/actor.h>
66
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
77
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
8+
#include <ydb/library/yql/public/purecalc/common/fwd.h>
89

910
namespace NFq::NRowDispatcher {
1011

@@ -20,6 +21,7 @@ struct IActorFactory : public TThrRefBase {
2021
ui32 partitionId,
2122
NYdb::TDriver driver,
2223
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
24+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
2325
const ::NMonitoring::TDynamicCounterPtr& counters,
2426
const NYql::IPqGateway::TPtr& pqGateway) const = 0;
2527
};

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,15 @@ class TJsonFilter::TImpl {
263263
TImpl(const TVector<TString>& columns,
264264
const TVector<TString>& types,
265265
const TString& whereFilter,
266-
TCallback callback)
266+
TCallback callback,
267+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
267268
: Sql(GenerateSql(whereFilter)) {
268269
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
269-
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());
270270

271271
// Program should be stateless because input values
272272
// allocated on another allocator and should be released
273273
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
274-
Program = factory->MakePushStreamProgram(
274+
Program = pureCalcProgramFactory->MakePushStreamProgram(
275275
TFilterInputSpec(MakeInputSchema(columns, types)),
276276
TFilterOutputSpec(MakeOutputSchema()),
277277
Sql,
@@ -311,8 +311,9 @@ TJsonFilter::TJsonFilter(
311311
const TVector<TString>& columns,
312312
const TVector<TString>& types,
313313
const TString& whereFilter,
314-
TCallback callback)
315-
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback)) {
314+
TCallback callback,
315+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
316+
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory)) {
316317
}
317318

318319
TJsonFilter::~TJsonFilter() {
@@ -330,8 +331,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
330331
const TVector<TString>& columns,
331332
const TVector<TString>& types,
332333
const TString& whereFilter,
333-
TCallback callback) {
334-
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback));
334+
TCallback callback,
335+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) {
336+
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory));
335337
}
336338

337339
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/json_filter.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
4+
#include <ydb/library/yql/public/purecalc/common/fwd.h>
45

56
namespace NFq {
67

@@ -13,7 +14,8 @@ class TJsonFilter {
1314
const TVector<TString>& columns,
1415
const TVector<TString>& types,
1516
const TString& whereFilter,
16-
TCallback callback);
17+
TCallback callback,
18+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
1719

1820
~TJsonFilter();
1921

@@ -29,6 +31,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
2931
const TVector<TString>& columns,
3032
const TVector<TString>& types,
3133
const TString& whereFilter,
32-
TJsonFilter::TCallback callback);
34+
TJsonFilter::TCallback callback,
35+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
3336

3437
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/library/actors/core/interconnect.h>
88
#include <ydb/library/yql/dq/actors/common/retry_queue.h>
99
#include <ydb/library/yql/providers/dq/counters/counters.h>
10+
#include <ydb/library/yql/public/purecalc/common/interface.h>
1011

1112
#include <ydb/core/fq/libs/actors/logging/log.h>
1213
#include <ydb/core/fq/libs/events/events.h>
@@ -119,6 +120,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
119120

120121
NConfig::TRowDispatcherConfig Config;
121122
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
123+
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
122124
TYqSharedResources::TPtr YqSharedResources;
123125
TMaybe<TActorId> CoordinatorActorId;
124126
TSet<TActorId> CoordinatorChangedSubscribers;
@@ -264,6 +266,7 @@ TRowDispatcher::TRowDispatcher(
264266
const NYql::IPqGateway::TPtr& pqGateway)
265267
: Config(config)
266268
, CredentialsProviderFactory(credentialsProviderFactory)
269+
, PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
267270
, YqSharedResources(yqSharedResources)
268271
, CredentialsFactory(credentialsFactory)
269272
, LogPrefix("RowDispatcher: ")
@@ -436,6 +439,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
436439
CredentialsFactory,
437440
ev->Get()->Record.GetToken(),
438441
source.GetAddBearerToToken()),
442+
PureCalcProgramFactory,
439443
Counters,
440444
PqGateway
441445
);

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
149149
ui32 PartitionId;
150150
NYdb::TDriver Driver;
151151
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
152+
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
152153
NYql::ITopicClient::TPtr TopicClient;
153154
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
154155
const i64 BufferSize;
@@ -179,6 +180,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
179180
ui32 partitionId,
180181
NYdb::TDriver driver,
181182
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
183+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
182184
const ::NMonitoring::TDynamicCounterPtr& counters,
183185
const NYql::IPqGateway::TPtr& pqGateway);
184186

@@ -268,6 +270,7 @@ TTopicSession::TTopicSession(
268270
ui32 partitionId,
269271
NYdb::TDriver driver,
270272
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
273+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
271274
const ::NMonitoring::TDynamicCounterPtr& counters,
272275
const NYql::IPqGateway::TPtr& pqGateway)
273276
: TopicPath(topicPath)
@@ -277,6 +280,7 @@ TTopicSession::TTopicSession(
277280
, PartitionId(partitionId)
278281
, Driver(std::move(driver))
279282
, CredentialsProviderFactory(credentialsProviderFactory)
283+
, PureCalcProgramFactory(pureCalcProgramFactory)
280284
, BufferSize(16_MB)
281285
, LogPrefix("TopicSession")
282286
, Config(config)
@@ -734,7 +738,8 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
734738
predicate,
735739
[&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){
736740
Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId));
737-
});
741+
},
742+
PureCalcProgramFactory);
738743
} else {
739744
ClientsWithoutPredicate.insert(ev->Sender);
740745
}
@@ -959,9 +964,10 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
959964
ui32 partitionId,
960965
NYdb::TDriver driver,
961966
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
967+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
962968
const ::NMonitoring::TDynamicCounterPtr& counters,
963969
const NYql::IPqGateway::TPtr& pqGateway) {
964-
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, pqGateway));
970+
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway));
965971
}
966972

967973
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/topic_session.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
1010
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
11+
#include <ydb/library/yql/public/purecalc/common/fwd.h>
1112

1213
#include <ydb/library/actors/core/actor.h>
1314

@@ -24,6 +25,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
2425
ui32 partitionId,
2526
NYdb::TDriver driver,
2627
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
28+
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
2729
const ::NMonitoring::TDynamicCounterPtr& counters,
2830
const NYql::IPqGateway::TPtr& pqGateway);
2931

ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <ydb/core/testlib/actor_helpers.h>
1111

1212
#include <ydb/library/yql/minikql/mkql_string_util.h>
13+
#include <ydb/library/yql/public/purecalc/common/interface.h>
1314

1415
#include <library/cpp/testing/unittest/registar.h>
1516

@@ -22,16 +23,24 @@ class TFixture : public NUnitTest::TBaseFixture {
2223

2324
public:
2425
TFixture()
25-
: Runtime(true)
26+
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
27+
, Runtime(true)
2628
, Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
2729
{}
2830

31+
static void SegmentationFaultHandler(int) {
32+
Cerr << "segmentation fault call stack:" << Endl;
33+
FormatBackTrace(&Cerr);
34+
abort();
35+
}
36+
2937
void SetUp(NUnitTest::TTestContext&) override {
38+
NKikimr::EnableYDBBacktraceFormat();
39+
signal(SIGSEGV, &SegmentationFaultHandler);
40+
3041
TAutoPtr<TAppPrepare> app = new TAppPrepare();
3142
Runtime.Initialize(app->Unwrap());
3243
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG);
33-
34-
NKikimr::EnableYDBBacktraceFormat();
3544
}
3645

3746
void TearDown(NUnitTest::TTestContext& /* context */) override {
@@ -55,7 +64,8 @@ class TFixture : public NUnitTest::TBaseFixture {
5564
columns,
5665
types,
5766
whereFilter,
58-
callback);
67+
callback,
68+
PureCalcProgramFactory);
5969
}
6070

6171
const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function<NYql::NUdf::TUnboxedValuePod(size_t)> valueCreator) {
@@ -90,8 +100,9 @@ class TFixture : public NUnitTest::TBaseFixture {
90100
});
91101
}
92102

93-
TActorSystemStub actorSystemStub;
103+
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
94104
NActors::TTestActorRuntime Runtime;
105+
TActorSystemStub ActorSystemStub;
95106
std::unique_ptr<NFq::TJsonFilter> Filter;
96107

97108
NKikimr::NMiniKQL::TScopedAlloc Alloc;

ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory {
3535
ui32 /*partitionId*/,
3636
NYdb::TDriver /*driver*/,
3737
std::shared_ptr<NYdb::ICredentialsProviderFactory> /*credentialsProviderFactory*/,
38+
NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/,
3839
const ::NMonitoring::TDynamicCounterPtr& /*counters*/,
3940
const NYql::IPqGateway::TPtr& /*pqGateway*/) const override {
4041
auto actorId = Runtime.AllocateEdgeActor();

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <ydb/tests/fq/pq_async_io/ut_helpers.h>
1414

1515
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
16+
#include <ydb/library/yql/public/purecalc/common/interface.h>
1617

1718
namespace {
1819

@@ -24,10 +25,11 @@ const ui64 TimeoutBeforeStartSessionSec = 3;
2425
const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
2526

2627
class TFixture : public NUnitTest::TBaseFixture {
27-
2828
public:
2929
TFixture()
30-
: Runtime(true) {}
30+
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
31+
, Runtime(true)
32+
{}
3133

3234
void SetUp(NUnitTest::TTestContext&) override {
3335
TAutoPtr<TAppPrepare> app = new TAppPrepare();
@@ -68,6 +70,7 @@ class TFixture : public NUnitTest::TBaseFixture {
6870
0,
6971
Driver,
7072
CredentialsProviderFactory,
73+
PureCalcProgramFactory,
7174
MakeIntrusive<NMonitoring::TDynamicCounters>(),
7275
CreatePqNativeGateway(pqServices)
7376
).release());
@@ -155,8 +158,9 @@ class TFixture : public NUnitTest::TBaseFixture {
155158
return eventHolder->Get()->Record.MessagesSize();
156159
}
157160

158-
TActorSystemStub actorSystemStub;
161+
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
159162
NActors::TTestActorRuntime Runtime;
163+
TActorSystemStub ActorSystemStub;
160164
NActors::TActorId TopicSession;
161165
NActors::TActorId RowDispatcherActorId;
162166
NYdb::TDriver Driver = NYdb::TDriver(NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")));

0 commit comments

Comments
 (0)