Skip to content

Commit a9e2b92

Browse files
authored
Fix usage single UDF object from many threads in DQ. (#7436)
1 parent 91c61a7 commit a9e2b92

File tree

11 files changed

+179
-114
lines changed

11 files changed

+179
-114
lines changed

ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ class TKqpLookupRowsWrapper : public TStatelessFlowComputationNode<TKqpLookupRow
121121
switch (keysValues.Fetch(key)) {
122122
case NUdf::EFetchStatus::Ok: {
123123
TVector<TCell> keyCells(ParseResult.KeyIndices.size());
124-
FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, keyCells, *ctx.TypeEnv);
124+
FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, keyCells, ctx.TypeEnv);
125125

126126
NUdf::TUnboxedValue result;
127127
TKqpTableStats stats;
@@ -203,10 +203,10 @@ class TKqpLookupTableWrapper : public TStatelessFlowComputationNode<TKqpLookupTa
203203
MKQL_ENSURE_S(tableInfo);
204204

205205
TVector<TCell> fromCells(tableInfo->KeyColumns.size());
206-
FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, fromCells, *ctx.TypeEnv);
206+
FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, fromCells, ctx.TypeEnv);
207207

208208
TVector<TCell> toCells(ParseResult.KeyIndices.size());
209-
FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, toCells, *ctx.TypeEnv);
209+
FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, toCells, ctx.TypeEnv);
210210

211211
auto range = TTableRange(fromCells, true, toCells, true);
212212

ydb/core/tx/datashard/datashard_kqp_read_table.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,10 @@ class TKqpWideReadTableWrapper : public TKqpWideReadTableWrapperBase<IsReverse>
269269
EFetchResult ReadValue(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const final {
270270
if (!this->Iterator) {
271271
TVector<TCell> fromCells;
272-
BuildKeyTupleCells(ParseResult.FromTuple->GetType(), FromNode->GetValue(ctx), fromCells, *ctx.TypeEnv);
272+
BuildKeyTupleCells(ParseResult.FromTuple->GetType(), FromNode->GetValue(ctx), fromCells, ctx.TypeEnv);
273273

274274
TVector<TCell> toCells;
275-
BuildKeyTupleCells(ParseResult.ToTuple->GetType(), ToNode->GetValue(ctx), toCells, *ctx.TypeEnv);
275+
BuildKeyTupleCells(ParseResult.ToTuple->GetType(), ToNode->GetValue(ctx), toCells, ctx.TypeEnv);
276276

277277
auto range = TTableRange(fromCells, ParseResult.FromInclusive, toCells, ParseResult.ToInclusive);
278278

@@ -328,7 +328,7 @@ class TKqpWideReadTableRangesWrapper : public TKqpWideReadTableWrapperBase<IsRev
328328
if (!RangeId) {
329329
const auto localTid = this->ComputeCtx.GetLocalTableId(ParseResult.TableId);
330330
const auto* tableInfo = this->ComputeCtx.Database->GetScheme().GetTableInfo(localTid);
331-
Ranges = CreateTableRanges<IsReverse>(ParseResult, RangesNode, *ctx.TypeEnv, ctx, tableInfo->KeyColumns.size());
331+
Ranges = CreateTableRanges<IsReverse>(ParseResult, RangesNode, ctx.TypeEnv, ctx, tableInfo->KeyColumns.size());
332332
RangeId = 0;
333333

334334
if (ItemsLimit) {

ydb/core/tx/program/registry.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@
77

88
namespace NKikimr::NOlap {
99

10-
::NTls::TValue<NMiniKQL::IBuiltinFunctionRegistry::TPtr> Registry;
10+
::NTls::TValue<TIntrusivePtr<NMiniKQL::IMutableFunctionRegistry>> Registry;
1111

1212
bool TKernelsRegistry::Parse(const TString& serialized) {
1313
Y_ABORT_UNLESS(!!serialized);
1414
if (!Registry.Get()) {
15-
Registry = NMiniKQL::CreateBuiltinRegistry();
15+
auto registry = NMiniKQL::CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry())->Clone();
16+
NMiniKQL::FillStaticModules(*registry.Get());
17+
Registry = std::move(registry);
1618
}
17-
auto copy = Registry.Get();
18-
auto functionRegistry = NMiniKQL::CreateFunctionRegistry(std::move(copy))->Clone();
19-
NMiniKQL::FillStaticModules(*functionRegistry);
19+
2020
auto nodeFactory = NMiniKQL::GetBuiltinFactory();
21-
auto kernels = NYql::LoadKernels(serialized, *functionRegistry, nodeFactory);
21+
auto kernels = NYql::LoadKernels(serialized, *Registry.Get(), nodeFactory);
2222
Kernels.swap(kernels);
2323
for (const auto& kernel : Kernels) {
2424
arrow::compute::Arity arity(kernel->signature->in_types().size(), kernel->signature->is_varargs());

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ class TDqTaskRunner : public IDqTaskRunner {
457457
auto opts = CreatePatternOpts(task, Alloc(), TypeEnv());
458458

459459
AllocatedHolder->ProgramParsed.CompGraph = AllocatedHolder->ProgramParsed.GetPattern()->Clone(
460-
opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &TypeEnv()));
460+
opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider));
461461

462462
TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator());
463463

ydb/library/yql/minikql/comp_nodes/mkql_scalar_apply.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,17 @@ class TScalarApplyWrapper : public TMutableComputationNode<TScalarApplyWrapper>
4444
struct TKernelState : public arrow::compute::KernelState {
4545
TKernelState(const TVector<TType*>& argsTypes, TType* returnType, const TComputationContext& originalContext)
4646
: Alloc(__LOCATION__)
47+
, TypeEnv(Alloc)
4748
, MemInfo("ScalarApply")
48-
, HolderFactory(Alloc.Ref(), MemInfo)
49+
, FunctionRegistry(originalContext.HolderFactory.GetFunctionRegistry()->Clone())
50+
, HolderFactory(Alloc.Ref(), MemInfo, FunctionRegistry.Get())
4951
, ValueBuilder(HolderFactory, NUdf::EValidatePolicy::Exception)
5052
, PgBuilder(NYql::CreatePgBuilder())
5153
, Accessors(argsTypes, returnType, *PgBuilder)
5254
, RandomProvider(CreateDefaultRandomProvider())
5355
, TimeProvider(CreateDefaultTimeProvider())
5456
, Ctx(HolderFactory, &ValueBuilder, TComputationOptsFull(
55-
nullptr, Alloc.Ref(), *RandomProvider, *TimeProvider, NUdf::EValidatePolicy::Exception, nullptr),
57+
nullptr, Alloc.Ref(), TypeEnv, *RandomProvider, *TimeProvider, NUdf::EValidatePolicy::Exception, originalContext.SecureParamsProvider, originalContext.CountersProvider),
5658
originalContext.Mutables, *NYql::NUdf::GetYqlMemoryPool())
5759
{
5860
Alloc.Ref().EnableArrowTracking = false;
@@ -65,7 +67,9 @@ class TScalarApplyWrapper : public TMutableComputationNode<TScalarApplyWrapper>
6567
}
6668

6769
TScopedAlloc Alloc;
70+
TTypeEnvironment TypeEnv;
6871
TMemoryUsageInfo MemInfo;
72+
const IFunctionRegistry::TPtr FunctionRegistry;
6973
THolderFactory HolderFactory;
7074
TDefaultValueBuilder ValueBuilder;
7175
std::unique_ptr<NUdf::IPgBuilder> PgBuilder;

0 commit comments

Comments
 (0)