Skip to content

Commit bd6f576

Browse files
authored
Introduce computation node for BlockMapJoinCore (Left and Inner) (#8030)
1 parent 3c06027 commit bd6f576

File tree

3 files changed

+230
-52
lines changed

3 files changed

+230
-52
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ size_t CalcMaxBlockLength(const TVector<TType*>& items) {
2121
}));
2222
}
2323

24-
template <bool RightRequired>
25-
class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<RightRequired>>
24+
template <bool WithoutRight, bool RightRequired>
25+
class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>
2626
{
27-
using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<RightRequired>>;
27+
using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>;
2828
public:
2929
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
3030
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
@@ -47,8 +47,16 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
4747
do {
4848
while (s.IsNotFull() && s.NextRow()) {
4949
const auto key = MakeKeysTuple(ctx, s, LeftKeyColumns_);
50-
if (key && dict.Contains(key) == RightRequired) {
51-
s.CopyRow();
50+
if constexpr (WithoutRight) {
51+
if (key && dict.Contains(key) == RightRequired) {
52+
s.CopyRow();
53+
}
54+
} else if constexpr (RightRequired) {
55+
if (NUdf::TUnboxedValue lookup; key && (lookup = dict.Lookup(key))) {
56+
s.MakeRow(lookup);
57+
}
58+
} else {
59+
s.MakeRow(dict.Lookup(key));
5260
}
5361
}
5462
if (!s.IsFinished()) {
@@ -199,6 +207,24 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
199207
OutputRows_++;
200208
}
201209

210+
void MakeRow(const NUdf::TUnboxedValuePod& value) {
211+
// Copy items from the "left" flow.
212+
for (size_t i = 0; i < InputWidth_; i++) {
213+
AddItem(GetItem(i), i);
214+
}
215+
// Convert and append items from the "right" dict.
216+
if (value) {
217+
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
218+
AddValue(value.GetElement(j), i);
219+
}
220+
} else {
221+
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
222+
AddValue(value, i);
223+
}
224+
}
225+
OutputRows_++;
226+
}
227+
202228
void CopyArray(size_t idx, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) {
203229
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
204230
Y_ENSURE(datum.is_array());
@@ -313,11 +339,19 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
313339
MKQL_ENSURE(leftFlowComponents.size() > 0, "Expected at least one column");
314340
const TVector<TType*> leftFlowItems(leftFlowComponents.cbegin(), leftFlowComponents.cend());
315341

342+
const auto rightDictNode = callable.GetInput(1);
343+
MKQL_ENSURE(rightDictNode.GetStaticType()->IsDict(),
344+
"Expected Dict as a right join part");
345+
const auto rightDictType = AS_TYPE(TDictType, rightDictNode);
346+
MKQL_ENSURE(rightDictType->GetPayloadType()->IsVoid() ||
347+
rightDictType->GetPayloadType()->IsTuple(),
348+
"Expected Void or Tuple as a right dict item type");
349+
316350
const auto joinKindNode = callable.GetInput(2);
317351
const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
318352
const auto joinKind = GetJoinKind(rawKind);
319-
// TODO: Handle other join types.
320-
Y_ENSURE(joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly);
353+
Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
354+
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly);
321355

322356
const auto tupleLiteral = AS_VALUE(TTupleLiteral, callable.GetInput(3));
323357
TVector<ui32> leftKeyColumns;
@@ -333,13 +367,21 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
333367
const auto dict = LocateNode(ctx.NodeLocator, callable, 1);
334368

335369
switch (joinKind) {
370+
case EJoinKind::Inner:
371+
return new TBlockWideMapJoinWrapper<false, true>(ctx.Mutables,
372+
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
373+
static_cast<IComputationWideFlowNode*>(flow), dict);
374+
case EJoinKind::Left:
375+
return new TBlockWideMapJoinWrapper<false, false>(ctx.Mutables,
376+
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
377+
static_cast<IComputationWideFlowNode*>(flow), dict);
336378
case EJoinKind::LeftSemi:
337-
return new TBlockWideMapJoinWrapper<true>(ctx.Mutables, std::move(joinItems),
338-
std::move(leftFlowItems), std::move(leftKeyColumns),
379+
return new TBlockWideMapJoinWrapper<true, true>(ctx.Mutables,
380+
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
339381
static_cast<IComputationWideFlowNode*>(flow), dict);
340382
case EJoinKind::LeftOnly:
341-
return new TBlockWideMapJoinWrapper<false>(ctx.Mutables, std::move(joinItems),
342-
std::move(leftFlowItems), std::move(leftKeyColumns),
383+
return new TBlockWideMapJoinWrapper<true, false>(ctx.Mutables,
384+
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
343385
static_cast<IComputationWideFlowNode*>(flow), dict);
344386
default:
345387
Y_ABORT();

0 commit comments

Comments
 (0)