Skip to content

Commit 79360ac

Browse files
authored
Add multikey support for BlockMapJoinCore computation node (#9191)
1 parent 7dd3efc commit 79360ac

File tree

2 files changed

+419
-56
lines changed

2 files changed

+419
-56
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp

Lines changed: 83 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,10 @@ class TBlockJoinState : public TBlockState {
180180
TVector<std::unique_ptr<IArrayBuilder>> Builders_;
181181
};
182182

183-
template <bool WithoutRight, bool RightRequired>
184-
class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>
183+
template <bool WithoutRight, bool RightRequired, bool IsTuple>
184+
class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>
185185
{
186-
using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>;
186+
using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>;
187187
using TState = TBlockJoinState<RightRequired>;
188188
public:
189189
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
@@ -198,6 +198,7 @@ using TState = TBlockJoinState<RightRequired>;
198198
, Flow_(flow)
199199
, Dict_(dict)
200200
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
201+
, KeyTuple_(mutables)
201202
{}
202203

203204
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -207,7 +208,7 @@ using TState = TBlockJoinState<RightRequired>;
207208

208209
while (!blockState.HasBlocks()) {
209210
while (blockState.IsNotFull() && blockState.NextRow()) {
210-
const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_);
211+
const auto key = MakeKeysTuple(ctx, blockState);
211212
if constexpr (WithoutRight) {
212213
if (key && dict.Contains(key) == RightRequired) {
213214
blockState.CopyRow();
@@ -270,10 +271,18 @@ using TState = TBlockJoinState<RightRequired>;
270271
return *static_cast<TState*>(state.AsBoxed().Get());
271272
}
272273

273-
NUdf::TUnboxedValue MakeKeysTuple(const TComputationContext& ctx, const TState& state, const TVector<ui32>& keyColumns) const {
274-
// TODO: Handle complex key.
274+
NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx, const TState& state) const {
275275
// TODO: Handle converters.
276-
return state.GetValue(ctx.HolderFactory, keyColumns.front());
276+
if constexpr (!IsTuple) {
277+
return state.GetValue(ctx.HolderFactory, LeftKeyColumns_.front());
278+
}
279+
280+
NUdf::TUnboxedValue* items = nullptr;
281+
const auto keys = KeyTuple_.NewArray(ctx, LeftKeyColumns_.size(), items);
282+
for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
283+
items[i] = state.GetValue(ctx.HolderFactory, LeftKeyColumns_[i]);
284+
}
285+
return keys;
277286
}
278287

279288
const TVector<TType*> ResultJoinItems_;
@@ -283,12 +292,13 @@ using TState = TBlockJoinState<RightRequired>;
283292
IComputationWideFlowNode* const Flow_;
284293
IComputationNode* const Dict_;
285294
ui32 WideFieldsIndex_;
295+
const TContainerCacheOnContext KeyTuple_;
286296
};
287297

288-
template<bool RightRequired>
289-
class TBlockWideMultiMapJoinWrapper : public TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired>>
298+
template<bool RightRequired, bool IsTuple>
299+
class TBlockWideMultiMapJoinWrapper : public TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>
290300
{
291-
using TBaseComputation = TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired>>;
301+
using TBaseComputation = TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>;
292302
using TState = TBlockJoinState<RightRequired>;
293303
public:
294304
TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables,
@@ -303,6 +313,7 @@ using TState = TBlockJoinState<RightRequired>;
303313
, Flow_(flow)
304314
, Dict_(dict)
305315
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
316+
, KeyTuple_(mutables)
306317
{}
307318

308319
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, NUdf::TUnboxedValue& iterator, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -320,7 +331,7 @@ using TState = TBlockJoinState<RightRequired>;
320331
}
321332
}
322333
if (blockState.IsNotFull() && blockState.NextRow()) {
323-
const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_);
334+
const auto key = MakeKeysTuple(ctx, blockState);
324335
// Lookup the item in the right dict. If the lookup succeeds,
325336
// reset the iterator and proceed the execution from the
326337
// beginning of the outer loop. Otherwise, the iterState is
@@ -419,10 +430,18 @@ using TState = TBlockJoinState<RightRequired>;
419430
return *static_cast<TIterator*>(iterator.AsBoxed().Get());
420431
}
421432

422-
NUdf::TUnboxedValue MakeKeysTuple(const TComputationContext& ctx, const TState& state, const TVector<ui32>& keyColumns) const {
423-
// TODO: Handle complex key.
433+
NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx, const TState& state) const {
424434
// TODO: Handle converters.
425-
return state.GetValue(ctx.HolderFactory, keyColumns.front());
435+
if constexpr (!IsTuple) {
436+
return state.GetValue(ctx.HolderFactory, LeftKeyColumns_.front());
437+
}
438+
439+
NUdf::TUnboxedValue* items = nullptr;
440+
const auto keys = KeyTuple_.NewArray(ctx, LeftKeyColumns_.size(), items);
441+
for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
442+
items[i] = state.GetValue(ctx.HolderFactory, LeftKeyColumns_[i]);
443+
}
444+
return keys;
426445
}
427446

428447
const TVector<TType*> ResultJoinItems_;
@@ -432,6 +451,7 @@ using TState = TBlockJoinState<RightRequired>;
432451
IComputationWideFlowNode* const Flow_;
433452
IComputationNode* const Dict_;
434453
ui32 WideFieldsIndex_;
454+
const TContainerCacheOnContext KeyTuple_;
435455
};
436456

437457
} // namespace
@@ -483,8 +503,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
483503
const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
484504
leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
485505
}
486-
// TODO: Handle multi keys.
487-
Y_ENSURE(leftKeyColumns.size() == 1);
506+
const bool isTupleKey = leftKeyColumns.size() > 1;
488507

489508
const auto keyDropsLiteral = callable.GetInput(4);
490509
const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral);
@@ -514,44 +533,54 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
514533
const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
515534
const auto dict = LocateNode(ctx.NodeLocator, callable, 1);
516535

517-
switch (joinKind) {
518-
static const auto joinNames = GetEnumNames<EJoinKind>();
519-
case EJoinKind::Inner:
520-
if (isMulti) {
521-
return new TBlockWideMultiMapJoinWrapper<true>(ctx.Mutables,
522-
std::move(joinItems), std::move(leftFlowItems),
523-
std::move(leftKeyColumns), std::move(leftIOMap),
524-
static_cast<IComputationWideFlowNode*>(flow), dict);
525-
}
526-
return new TBlockWideMapJoinWrapper<false, true>(ctx.Mutables,
527-
std::move(joinItems), std::move(leftFlowItems),
528-
std::move(leftKeyColumns), std::move(leftIOMap),
529-
static_cast<IComputationWideFlowNode*>(flow), dict);
530-
case EJoinKind::Left:
531-
if (isMulti) {
532-
return new TBlockWideMultiMapJoinWrapper<false>(ctx.Mutables,
533-
std::move(joinItems), std::move(leftFlowItems),
534-
std::move(leftKeyColumns), std::move(leftIOMap),
535-
static_cast<IComputationWideFlowNode*>(flow), dict);
536-
}
537-
return new TBlockWideMapJoinWrapper<false, false>(ctx.Mutables,
538-
std::move(joinItems), std::move(leftFlowItems),
539-
std::move(leftKeyColumns), std::move(leftIOMap),
540-
static_cast<IComputationWideFlowNode*>(flow), dict);
541-
case EJoinKind::LeftSemi:
542-
return new TBlockWideMapJoinWrapper<true, true>(ctx.Mutables,
543-
std::move(joinItems), std::move(leftFlowItems),
544-
std::move(leftKeyColumns), std::move(leftIOMap),
545-
static_cast<IComputationWideFlowNode*>(flow), dict);
546-
case EJoinKind::LeftOnly:
547-
return new TBlockWideMapJoinWrapper<true, false>(ctx.Mutables,
548-
std::move(joinItems), std::move(leftFlowItems),
549-
std::move(leftKeyColumns), std::move(leftIOMap),
550-
static_cast<IComputationWideFlowNode*>(flow), dict);
551-
default:
552-
MKQL_ENSURE(false, "BlockMapJoinCore doesn't support %s join type"
553-
<< joinNames.at(joinKind));
554-
}
536+
#define DISPATCH_JOIN(IS_TUPLE) do { \
537+
switch (joinKind) { \
538+
case EJoinKind::Inner: \
539+
if (isMulti) { \
540+
return new TBlockWideMultiMapJoinWrapper<true, IS_TUPLE>(ctx.Mutables, \
541+
std::move(joinItems), std::move(leftFlowItems), \
542+
std::move(leftKeyColumns), std::move(leftIOMap), \
543+
static_cast<IComputationWideFlowNode*>(flow), dict); \
544+
} \
545+
return new TBlockWideMapJoinWrapper<false, true, IS_TUPLE>(ctx.Mutables, \
546+
std::move(joinItems), std::move(leftFlowItems), \
547+
std::move(leftKeyColumns), std::move(leftIOMap), \
548+
static_cast<IComputationWideFlowNode*>(flow), dict); \
549+
case EJoinKind::Left: \
550+
if (isMulti) { \
551+
return new TBlockWideMultiMapJoinWrapper<false, IS_TUPLE>(ctx.Mutables, \
552+
std::move(joinItems), std::move(leftFlowItems), \
553+
std::move(leftKeyColumns), std::move(leftIOMap), \
554+
static_cast<IComputationWideFlowNode*>(flow), dict); \
555+
} \
556+
return new TBlockWideMapJoinWrapper<false, false, IS_TUPLE>(ctx.Mutables, \
557+
std::move(joinItems), std::move(leftFlowItems), \
558+
std::move(leftKeyColumns), std::move(leftIOMap), \
559+
static_cast<IComputationWideFlowNode*>(flow), dict); \
560+
case EJoinKind::LeftSemi: \
561+
return new TBlockWideMapJoinWrapper<true, true, IS_TUPLE>(ctx.Mutables, \
562+
std::move(joinItems), std::move(leftFlowItems), \
563+
std::move(leftKeyColumns), std::move(leftIOMap), \
564+
static_cast<IComputationWideFlowNode*>(flow), dict); \
565+
case EJoinKind::LeftOnly: \
566+
return new TBlockWideMapJoinWrapper<true, false, IS_TUPLE>(ctx.Mutables, \
567+
std::move(joinItems), std::move(leftFlowItems), \
568+
std::move(leftKeyColumns), std::move(leftIOMap), \
569+
static_cast<IComputationWideFlowNode*>(flow), dict); \
570+
default: \
571+
/* TODO: Display the human-readable join kind name. */ \
572+
MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #" \
573+
<< static_cast<ui32>(joinKind)); \
574+
} \
575+
} while(0)
576+
577+
if (isTupleKey) {
578+
DISPATCH_JOIN(true);
579+
} else {
580+
DISPATCH_JOIN(false);
581+
}
582+
583+
#undef DISPATCH_JOIN
555584
}
556585

557586
} // namespace NMiniKQL

0 commit comments

Comments
 (0)