Skip to content

Commit 4f855a7

Browse files
authored
Implement excess key columns drop for BlockMapJoinCore computation node (#9036)
1 parent 9213a37 commit 4f855a7

File tree

4 files changed

+399
-36
lines changed

4 files changed

+399
-36
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp

Lines changed: 73 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ class TBlockJoinState : public TBlockState {
2828
public:
2929
TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
3030
const TVector<TType*>& inputItems,
31+
const TVector<ui32>& leftIOMap,
3132
const TVector<TType*> outputItems,
3233
NUdf::TUnboxedValue**const fields)
3334
: TBlockState(memInfo, outputItems.size())
3435
, InputWidth_(inputItems.size() - 1)
3536
, OutputWidth_(outputItems.size() - 1)
3637
, Inputs_(inputItems.size())
38+
, LeftIOMap_(leftIOMap)
3739
, InputsDescr_(ToValueDescr(inputItems))
3840
{
3941
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
@@ -54,30 +56,37 @@ class TBlockJoinState : public TBlockState {
5456

5557
void CopyRow() {
5658
// Copy items from the "left" flow.
57-
for (size_t i = 0; i < InputWidth_; i++) {
58-
AddItem(GetItem(i), i);
59+
// Use the mapping from input fields to output ones to
60+
// produce a tight loop to copy row items.
61+
for (size_t i = 0; i < LeftIOMap_.size(); i++) {
62+
AddItem(GetItem(LeftIOMap_[i]), i);
5963
}
6064
OutputRows_++;
6165
}
6266

6367
void MakeRow(const NUdf::TUnboxedValuePod& value) {
68+
size_t builderIndex = 0;
6469
// Copy items from the "left" flow.
65-
for (size_t i = 0; i < InputWidth_; i++) {
66-
AddItem(GetItem(i), i);
70+
// Use the mapping from input fields to output ones to
71+
// produce a tight loop to copy row items.
72+
for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
73+
AddItem(GetItem(LeftIOMap_[i]), i);
6774
}
6875
// Convert and append items from the "right" dict.
76+
// Since the keys are copied to the output only from the
77+
// "left" flow, process all values unconditionally.
6978
if constexpr (RightRequired) {
70-
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
71-
AddValue(value.GetElement(j), i);
79+
for (size_t i = 0; builderIndex < OutputWidth_; i++) {
80+
AddValue(value.GetElement(i), builderIndex++);
7281
}
7382
} else {
7483
if (value) {
75-
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
76-
AddValue(value.GetElement(j), i);
84+
for (size_t i = 0; builderIndex < OutputWidth_; i++) {
85+
AddValue(value.GetElement(i), builderIndex++);
7786
}
7887
} else {
79-
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
80-
AddValue(value, i);
88+
while (builderIndex < OutputWidth_) {
89+
AddValue(value, builderIndex++);
8190
}
8291
}
8392
}
@@ -164,6 +173,7 @@ class TBlockJoinState : public TBlockState {
164173
size_t InputWidth_;
165174
size_t OutputWidth_;
166175
TUnboxedValueVector Inputs_;
176+
const TVector<ui32> LeftIOMap_;
167177
const std::vector<arrow::ValueDescr> InputsDescr_;
168178
TVector<std::unique_ptr<IBlockReader>> Readers_;
169179
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
@@ -178,12 +188,13 @@ using TState = TBlockJoinState<RightRequired>;
178188
public:
179189
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
180190
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
181-
TVector<ui32>&& leftKeyColumns,
191+
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
182192
IComputationWideFlowNode* flow, IComputationNode* dict)
183193
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
184194
, ResultJoinItems_(std::move(resultJoinItems))
185195
, LeftFlowItems_(std::move(leftFlowItems))
186196
, LeftKeyColumns_(std::move(leftKeyColumns))
197+
, LeftIOMap_(leftIOMap)
187198
, Flow_(flow)
188199
, Dict_(dict)
189200
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
@@ -248,7 +259,8 @@ using TState = TBlockJoinState<RightRequired>;
248259
}
249260

250261
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
251-
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
262+
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftIOMap_,
263+
ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
252264
}
253265

254266
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
@@ -267,6 +279,7 @@ using TState = TBlockJoinState<RightRequired>;
267279
const TVector<TType*> ResultJoinItems_;
268280
const TVector<TType*> LeftFlowItems_;
269281
const TVector<ui32> LeftKeyColumns_;
282+
const TVector<ui32> LeftIOMap_;
270283
IComputationWideFlowNode* const Flow_;
271284
IComputationNode* const Dict_;
272285
ui32 WideFieldsIndex_;
@@ -280,12 +293,13 @@ using TState = TBlockJoinState<RightRequired>;
280293
public:
281294
TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables,
282295
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
283-
TVector<ui32>&& leftKeyColumns,
296+
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
284297
IComputationWideFlowNode* flow, IComputationNode* dict)
285298
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Boxed)
286299
, ResultJoinItems_(std::move(resultJoinItems))
287300
, LeftFlowItems_(std::move(leftFlowItems))
288301
, LeftKeyColumns_(std::move(leftKeyColumns))
302+
, LeftIOMap_(leftIOMap)
289303
, Flow_(flow)
290304
, Dict_(dict)
291305
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
@@ -357,7 +371,8 @@ using TState = TBlockJoinState<RightRequired>;
357371
}
358372

359373
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
360-
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
374+
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftIOMap_,
375+
ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
361376
}
362377

363378
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
@@ -413,6 +428,7 @@ using TState = TBlockJoinState<RightRequired>;
413428
const TVector<TType*> ResultJoinItems_;
414429
const TVector<TType*> LeftFlowItems_;
415430
const TVector<ui32> LeftKeyColumns_;
431+
const TVector<ui32> LeftIOMap_;
416432
IComputationWideFlowNode* const Flow_;
417433
IComputationNode* const Dict_;
418434
ui32 WideFieldsIndex_;
@@ -421,7 +437,7 @@ using TState = TBlockJoinState<RightRequired>;
421437
} // namespace
422438

423439
IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
424-
MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
440+
MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
425441

426442
const auto joinType = callable.GetType()->GetReturnType();
427443
MKQL_ENSURE(joinType->IsFlow(), "Expected WideFlow as a resulting stream");
@@ -459,16 +475,42 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
459475
Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
460476
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly);
461477

462-
const auto tupleLiteral = AS_VALUE(TTupleLiteral, callable.GetInput(3));
478+
const auto keyColumnsLiteral = callable.GetInput(3);
479+
const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral);
463480
TVector<ui32> leftKeyColumns;
464-
leftKeyColumns.reserve(tupleLiteral->GetValuesCount());
465-
for (ui32 i = 0; i < tupleLiteral->GetValuesCount(); i++) {
466-
const auto item = AS_VALUE(TDataLiteral, tupleLiteral->GetValue(i));
481+
leftKeyColumns.reserve(keyColumnsTuple->GetValuesCount());
482+
for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) {
483+
const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
467484
leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
468485
}
469486
// TODO: Handle multi keys.
470487
Y_ENSURE(leftKeyColumns.size() == 1);
471488

489+
const auto keyDropsLiteral = callable.GetInput(4);
490+
const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral);
491+
THashSet<ui32> leftKeyDrops;
492+
leftKeyDrops.reserve(keyDropsTuple->GetValuesCount());
493+
for (ui32 i = 0; i < keyDropsTuple->GetValuesCount(); i++) {
494+
const auto item = AS_VALUE(TDataLiteral, keyDropsTuple->GetValue(i));
495+
leftKeyDrops.emplace(item->AsValue().Get<ui32>());
496+
}
497+
498+
const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
499+
for (const auto& drop : leftKeyDrops) {
500+
MKQL_ENSURE(leftKeySet.contains(drop),
501+
"Only key columns has to be specified in drop column set");
502+
503+
}
504+
505+
TVector<ui32> leftIOMap;
506+
// XXX: Mind the last wide item, containing block length.
507+
for (size_t i = 0; i < leftFlowItems.size() - 1; i++) {
508+
if (leftKeyDrops.contains(i)) {
509+
continue;
510+
}
511+
leftIOMap.push_back(i);
512+
}
513+
472514
const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
473515
const auto dict = LocateNode(ctx.NodeLocator, callable, 1);
474516

@@ -477,28 +519,34 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
477519
case EJoinKind::Inner:
478520
if (isMulti) {
479521
return new TBlockWideMultiMapJoinWrapper<true>(ctx.Mutables,
480-
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
522+
std::move(joinItems), std::move(leftFlowItems),
523+
std::move(leftKeyColumns), std::move(leftIOMap),
481524
static_cast<IComputationWideFlowNode*>(flow), dict);
482525
}
483526
return new TBlockWideMapJoinWrapper<false, true>(ctx.Mutables,
484-
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
527+
std::move(joinItems), std::move(leftFlowItems),
528+
std::move(leftKeyColumns), std::move(leftIOMap),
485529
static_cast<IComputationWideFlowNode*>(flow), dict);
486530
case EJoinKind::Left:
487531
if (isMulti) {
488532
return new TBlockWideMultiMapJoinWrapper<false>(ctx.Mutables,
489-
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
533+
std::move(joinItems), std::move(leftFlowItems),
534+
std::move(leftKeyColumns), std::move(leftIOMap),
490535
static_cast<IComputationWideFlowNode*>(flow), dict);
491536
}
492537
return new TBlockWideMapJoinWrapper<false, false>(ctx.Mutables,
493-
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
538+
std::move(joinItems), std::move(leftFlowItems),
539+
std::move(leftKeyColumns), std::move(leftIOMap),
494540
static_cast<IComputationWideFlowNode*>(flow), dict);
495541
case EJoinKind::LeftSemi:
496542
return new TBlockWideMapJoinWrapper<true, true>(ctx.Mutables,
497-
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
543+
std::move(joinItems), std::move(leftFlowItems),
544+
std::move(leftKeyColumns), std::move(leftIOMap),
498545
static_cast<IComputationWideFlowNode*>(flow), dict);
499546
case EJoinKind::LeftOnly:
500547
return new TBlockWideMapJoinWrapper<true, false>(ctx.Mutables,
501-
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
548+
std::move(joinItems), std::move(leftFlowItems),
549+
std::move(leftKeyColumns), std::move(leftIOMap),
502550
static_cast<IComputationWideFlowNode*>(flow), dict);
503551
default:
504552
MKQL_ENSURE(false, "BlockMapJoinCore doesn't support %s join type"

0 commit comments

Comments
 (0)