Skip to content

Commit 6af3402

Browse files
committed
Added scoped alloc param into purecalc
1 parent 7e95d38 commit 6af3402

File tree

7 files changed

+56
-21
lines changed

7 files changed

+56
-21
lines changed

ydb/library/yql/public/purecalc/common/interface.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ TProgramFactoryOptions& TProgramFactoryOptions::SetUseWorkerPool(bool useWorkerP
119119
return *this;
120120
}
121121

122+
TProgramFactoryOptions& TProgramFactoryOptions::SetScopedAlloc(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> scopedAlloc) {
123+
ScopedAlloc = std::move(scopedAlloc);
124+
return *this;
125+
}
126+
122127
void NYql::NPureCalc::ConfigureLogging(const TLoggingOptions& options) {
123128
InitLogging(options);
124129
}

ydb/library/yql/public/purecalc/common/interface.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,9 @@ namespace NYql {
268268
/// Reuse allocated workers
269269
bool UseWorkerPool;
270270

271+
/// Explicit scoped alloc, may be used for several workers, by default worker uses own scoped alloc
272+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> ScopedAlloc;
273+
271274
public:
272275
TProgramFactoryOptions();
273276

@@ -375,6 +378,13 @@ namespace NYql {
375378
* @return reference to self, to allow method chaining.
376379
*/
377380
TProgramFactoryOptions& SetUseWorkerPool(bool useWorkerPool);
381+
382+
/**
383+
* Set explicit scoped alloc for reusing in several workers.
384+
*
385+
* @return reference to self, to allow method chaining.
386+
*/
387+
TProgramFactoryOptions& SetScopedAlloc(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> scopedAlloc);
378388
};
379389

380390
////////////////////////////////////////////////////////////////////////////////////////////////////

ydb/library/yql/public/purecalc/common/program_factory.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ IPullStreamWorkerFactoryPtr TProgramFactory::MakePullStreamWorkerFactory(
9191
Options_.NativeYtTypeFlags,
9292
Options_.DeterministicTimeProviderSeed,
9393
Options_.UseSystemColumns,
94-
Options_.UseWorkerPool
94+
Options_.UseWorkerPool,
95+
Options_.ScopedAlloc
9596
));
9697
}
9798

@@ -120,7 +121,8 @@ IPullListWorkerFactoryPtr TProgramFactory::MakePullListWorkerFactory(
120121
Options_.NativeYtTypeFlags,
121122
Options_.DeterministicTimeProviderSeed,
122123
Options_.UseSystemColumns,
123-
Options_.UseWorkerPool
124+
Options_.UseWorkerPool,
125+
Options_.ScopedAlloc
124126
));
125127
}
126128

@@ -153,6 +155,7 @@ IPushStreamWorkerFactoryPtr TProgramFactory::MakePushStreamWorkerFactory(
153155
Options_.NativeYtTypeFlags,
154156
Options_.DeterministicTimeProviderSeed,
155157
Options_.UseSystemColumns,
156-
Options_.UseWorkerPool
158+
Options_.UseWorkerPool,
159+
Options_.ScopedAlloc
157160
));
158161
}

ydb/library/yql/public/purecalc/common/worker.cpp

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,15 @@ TWorkerGraph::TWorkerGraph(
4444
const TString& LLVMSettings,
4545
NKikimr::NUdf::ICountersProvider* countersProvider,
4646
ui64 nativeYtTypeFlags,
47-
TMaybe<ui64> deterministicTimeProviderSeed
47+
TMaybe<ui64> deterministicTimeProviderSeed,
48+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> scopedAlloc
4849
)
49-
: ScopedAlloc_(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators())
50-
, Env_(ScopedAlloc_)
50+
: ScopedAlloc_(scopedAlloc ?
51+
std::move(scopedAlloc) :
52+
std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators(), false)
53+
)
54+
, ScopedAllocGuard_(*ScopedAlloc_)
55+
, Env_(*ScopedAlloc_)
5156
, FuncRegistry_(funcRegistry)
5257
, RandomProvider_(CreateDefaultRandomProvider())
5358
, TimeProvider_(deterministicTimeProviderSeed ?
@@ -148,7 +153,7 @@ TWorkerGraph::TWorkerGraph(
148153
};
149154

150155
NKikimr::NMiniKQL::TComputationPatternOpts computationPatternOpts(
151-
ScopedAlloc_.Ref(),
156+
ScopedAlloc_->Ref(),
152157
Env_,
153158
nodeFactory,
154159
&funcRegistry,
@@ -170,14 +175,14 @@ TWorkerGraph::TWorkerGraph(
170175

171176
ComputationGraph_->Prepare();
172177

173-
// Scoped alloc acquires itself on construction. We need to release it before returning control to user.
174-
// Note that scoped alloc releases itself on destruction so it is no problem if the above code throws.
175-
ScopedAlloc_.Release();
178+
// Scoped alloc acquires when ScopedAllocGuard_ was created. We need to release it before returning control to user.
179+
// Note that scoped alloc releases on destruction ScopedAllocGuard_ so it is no problem if the above code throws.
180+
ScopedAlloc_->Release();
176181
}
177182

178183
TWorkerGraph::~TWorkerGraph() {
179184
// Remember, we've released scoped alloc in constructor? Now, we need to acquire it back before destroying.
180-
ScopedAlloc_.Acquire();
185+
ScopedAlloc_->Acquire();
181186
}
182187

183188
template <typename TBase>
@@ -196,12 +201,14 @@ TWorker<TBase>::TWorker(
196201
const TString& LLVMSettings,
197202
NKikimr::NUdf::ICountersProvider* countersProvider,
198203
ui64 nativeYtTypeFlags,
199-
TMaybe<ui64> deterministicTimeProviderSeed
204+
TMaybe<ui64> deterministicTimeProviderSeed,
205+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> scopedAlloc
200206
)
201207
: WorkerFactory_(std::move(factory))
202208
, Graph_(exprRoot, exprCtx, serializedProgram, funcRegistry, userData,
203209
inputTypes, originalInputTypes, rawInputTypes, outputType, rawOutputType,
204-
LLVMSettings, countersProvider, nativeYtTypeFlags, deterministicTimeProviderSeed)
210+
LLVMSettings, countersProvider, nativeYtTypeFlags, deterministicTimeProviderSeed,
211+
std::move(scopedAlloc))
205212
{
206213
}
207214

@@ -296,7 +303,7 @@ NYT::TNode TWorker<TBase>::MakeFullOutputSchema() const {
296303

297304
template <typename TBase>
298305
inline NKikimr::NMiniKQL::TScopedAlloc& TWorker<TBase>::GetScopedAlloc() {
299-
return Graph_.ScopedAlloc_;
306+
return *Graph_.ScopedAlloc_;
300307
}
301308

302309
template <typename TBase>

ydb/library/yql/public/purecalc/common/worker.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ namespace NYql {
3030
const TString& LLVMSettings,
3131
NKikimr::NUdf::ICountersProvider* countersProvider,
3232
ui64 nativeYtTypeFlags,
33-
TMaybe<ui64> deterministicTimeProviderSeed
33+
TMaybe<ui64> deterministicTimeProviderSeed,
34+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> scopedAlloc
3435
);
3536

3637
~TWorkerGraph();
3738

38-
NKikimr::NMiniKQL::TScopedAlloc ScopedAlloc_;
39+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> ScopedAlloc_;
40+
TGuard<NKikimr::NMiniKQL::TScopedAlloc> ScopedAllocGuard_;
3941
NKikimr::NMiniKQL::TTypeEnvironment Env_;
4042
const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry_;
4143
TIntrusivePtr<IRandomProvider> RandomProvider_;
@@ -80,7 +82,8 @@ namespace NYql {
8082
const TString& LLVMSettings,
8183
NKikimr::NUdf::ICountersProvider* countersProvider,
8284
ui64 nativeYtTypeFlags,
83-
TMaybe<ui64> deterministicTimeProviderSeed
85+
TMaybe<ui64> deterministicTimeProviderSeed,
86+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> scopedAlloc
8487
);
8588

8689
public:

ydb/library/yql/public/purecalc/common/worker_factory.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ TWorkerFactory<TBase>::TWorkerFactory(TWorkerFactoryOptions options, EProcessorM
4848
, DeterministicTimeProviderSeed_(options.DeterministicTimeProviderSeed_)
4949
, UseSystemColumns_(options.UseSystemColumns)
5050
, UseWorkerPool_(options.UseWorkerPool)
51+
, ScopedAlloc_(std::move(options.ScopedAlloc))
5152
{
5253
// Prepare input struct types and extract all column names from inputs
5354

@@ -261,10 +262,11 @@ TExprNode::TPtr TWorkerFactory<TBase>::Compile(
261262
LLVMSettings_,
262263
CountersProvider_,
263264
NativeYtTypeFlags_,
264-
DeterministicTimeProviderSeed_
265+
DeterministicTimeProviderSeed_,
266+
ScopedAlloc_
265267
);
266268

267-
with_lock (graph.ScopedAlloc_) {
269+
with_lock (*graph.ScopedAlloc_) {
268270
const auto value = graph.ComputationGraph_->GetValue();
269271
NCommon::WriteYsonValue(writer, value, const_cast<NKikimr::NMiniKQL::TType*>(graph.OutputType_), nullptr);
270272
}
@@ -510,7 +512,8 @@ void TWorkerFactory<TBase>::ReturnWorker(IWorker* worker) {
510512
LLVMSettings_, \
511513
CountersProvider_, \
512514
NativeYtTypeFlags_, \
513-
DeterministicTimeProviderSeed_ \
515+
DeterministicTimeProviderSeed_, \
516+
ScopedAlloc_ \
514517
)); \
515518
}
516519

ydb/library/yql/public/purecalc/common/worker_factory.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ namespace NYql {
3232
TMaybe<ui64> DeterministicTimeProviderSeed_;
3333
bool UseSystemColumns;
3434
bool UseWorkerPool;
35+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> ScopedAlloc;
3536

3637
TWorkerFactoryOptions(
3738
IProgramFactoryPtr Factory,
@@ -51,7 +52,8 @@ namespace NYql {
5152
ui64 nativeYtTypeFlags,
5253
TMaybe<ui64> deterministicTimeProviderSeed,
5354
bool useSystemColumns,
54-
bool useWorkerPool
55+
bool useWorkerPool,
56+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> scopedAlloc
5557
)
5658
: Factory(std::move(Factory))
5759
, InputSpec(InputSpec)
@@ -71,6 +73,7 @@ namespace NYql {
7173
, DeterministicTimeProviderSeed_(deterministicTimeProviderSeed)
7274
, UseSystemColumns(useSystemColumns)
7375
, UseWorkerPool(useWorkerPool)
76+
, ScopedAlloc(std::move(scopedAlloc))
7477
{
7578
}
7679
};
@@ -102,6 +105,7 @@ namespace NYql {
102105
bool UseSystemColumns_;
103106
bool UseWorkerPool_;
104107
TVector<THolder<IWorker>> WorkerPool_;
108+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> ScopedAlloc_;
105109

106110
public:
107111
TWorkerFactory(TWorkerFactoryOptions, EProcessorMode);

0 commit comments

Comments
 (0)