Skip to content

Commit 1799143

Browse files
committed
[GraceJoin] Skip reading input when output is guaranteed to be empty
Adds early termination optimization for `GraceJoin`: if one side is empty and the join kind guarantees an empty result, the other side is no longer read. Introduces two helpers: * `ShouldSkipRightIfLeftEmpty` * `ShouldSkipLeftIfRightEmpty` Also adds unit tests covering all join kinds and empty input combinations. Initial issue: <#19797> commit_hash:2683031df5b68c293536eacda05c8e79fc452d11
1 parent 99a63ea commit 1799143

File tree

3 files changed

+231
-4
lines changed

3 files changed

+231
-4
lines changed

yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,14 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
641641
}
642642

643643
private:
644+
bool CanSkipRightOnLeftFinished() const {
645+
return !IsSelfJoin_ && LeftPacker->TuplesPacked == 0 && GraceJoin::ShouldSkipRightIfLeftEmpty(JoinKind);
646+
}
647+
648+
bool CanSkipLeftOnRightFinished() const {
649+
return !IsSelfJoin_ && RightPacker->TuplesPacked == 0 && GraceJoin::ShouldSkipLeftIfRightEmpty(JoinKind);
650+
}
651+
644652
EOperatingMode GetMode() const {
645653
return Mode;
646654
}
@@ -769,6 +777,12 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
769777
}
770778
}
771779

780+
if (resultLeft == EFetchResult::Finish && CanSkipRightOnLeftFinished() ||
781+
resultRight == EFetchResult::Finish && CanSkipLeftOnRightFinished()) {
782+
IsEarlyExitDueToEmptyInput = true;
783+
return EFetchResult::Finish;
784+
}
785+
772786
if (resultLeft == EFetchResult::Yield || resultRight == EFetchResult::Yield) {
773787
return EFetchResult::Yield;
774788
}
@@ -863,6 +877,11 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
863877
}
864878

865879
auto isYield = FetchAndPackData(ctx, output);
880+
if (IsEarlyExitDueToEmptyInput) {
881+
*HaveMoreLeftRows = false;
882+
*HaveMoreRightRows = false;
883+
return EFetchResult::Finish;
884+
}
866885
if (isYield == EFetchResult::One)
867886
return isYield;
868887
if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
@@ -1062,6 +1081,7 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
10621081
const bool IsSpillingAllowed;
10631082

10641083
bool IsSpillingFinalized = false;
1084+
bool IsEarlyExitDueToEmptyInput = false;
10651085

10661086
NYql::NUdf::TCounter CounterOutputRows_;
10671087
ui32 SpilledBucketsJoinOrderCurrentIndex = 0;

yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,50 @@ namespace NKikimr {
1010
namespace NMiniKQL {
1111
namespace GraceJoin {
1212

13+
// Determines whether the right input can be safely skipped when the left input is empty.
14+
// For certain join kinds, the result will always be empty if the left side is empty,
15+
// regardless of the content of the right side. In such cases, it's safe to avoid reading the right input entirely.
16+
constexpr bool ShouldSkipRightIfLeftEmpty(EJoinKind kind) {
17+
switch (kind) {
18+
case EJoinKind::Inner:
19+
case EJoinKind::LeftOnly:
20+
case EJoinKind::LeftSemi:
21+
case EJoinKind::RightSemi:
22+
return true;
23+
case EJoinKind::RightOnly:
24+
case EJoinKind::Left:
25+
case EJoinKind::Right:
26+
case EJoinKind::Exclusion:
27+
case EJoinKind::Full:
28+
case EJoinKind::SemiMask:
29+
case EJoinKind::SemiSide:
30+
case EJoinKind::Cross:
31+
return false;
32+
}
33+
}
34+
35+
// Determines whether the left input can be safely skipped when the right input is empty.
36+
// For certain join kinds, the result will always be empty if the right side is empty,
37+
// regardless of the content of the left side. In such cases, it's safe to avoid reading the left input entirely.
38+
constexpr bool ShouldSkipLeftIfRightEmpty(EJoinKind kind) {
39+
switch (kind) {
40+
case EJoinKind::Inner:
41+
case EJoinKind::RightOnly:
42+
case EJoinKind::RightSemi:
43+
case EJoinKind::LeftSemi:
44+
return true;
45+
case EJoinKind::Min:
46+
case EJoinKind::Left:
47+
case EJoinKind::Right:
48+
case EJoinKind::Exclusion:
49+
case EJoinKind::Full:
50+
case EJoinKind::SemiMask:
51+
case EJoinKind::SemiSide:
52+
case EJoinKind::Cross:
53+
return false;
54+
}
55+
}
56+
1357
class TTableBucketSpiller;
1458
#define GRACEJOIN_DEBUG NUdf::ELogLevel::Debug
1559
#define GRACEJOIN_TRACE NUdf::ELogLevel::Trace

yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp

Lines changed: 167 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
#include "mkql_computation_node_ut.h"
22
#include <yql/essentials/minikql/mkql_runtime_version.h>
33
#include <yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h>
4+
#include <yql/essentials/minikql/mkql_string_util.h>
45

56
#include <yql/essentials/minikql/computation/mock_spiller_factory_ut.h>
67

7-
#include <chrono>
8-
#include <iostream>
98
#include <cstring>
109
#include <vector>
1110
#include <cassert>
@@ -17,8 +16,6 @@
1716
#include <util/stream/null.h>
1817
#include <util/system/mem_info.h>
1918

20-
#include <cstdint>
21-
2219
namespace NKikimr {
2320
namespace NMiniKQL {
2421

@@ -2631,7 +2628,173 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
26312628

26322629
}
26332630

2631+
constexpr std::string_view LeftStreamName = "LeftTestStream";
2632+
constexpr std::string_view RightStreamName = "RightTestStream";
2633+
2634+
struct TTestStreamParams {
2635+
ui64 MaxAllowedNumberOfFetches;
2636+
ui64 StreamSize;
2637+
};
2638+
2639+
class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper> {
2640+
using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
2641+
public:
2642+
class TStreamValue : public TComputationValue<TStreamValue> {
2643+
public:
2644+
using TBase = TComputationValue<TStreamValue>;
2645+
2646+
TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, TTestStreamParams& params)
2647+
: TBase(memInfo), CompCtx(compCtx), Params(params)
2648+
{}
2649+
private:
2650+
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
2651+
++TotalFetches;
2652+
2653+
UNIT_ASSERT_LE(TotalFetches, Params.MaxAllowedNumberOfFetches);
2654+
2655+
if (TotalFetches > Params.StreamSize) {
2656+
return NUdf::EFetchStatus::Finish;
2657+
}
2658+
2659+
NUdf::TUnboxedValue* items = nullptr;
2660+
result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items);
2661+
items[0] = NUdf::TUnboxedValuePod(TotalFetches);
2662+
items[1] = MakeString(ToString(TotalFetches) * 5);
2663+
2664+
return NUdf::EFetchStatus::Ok;
2665+
}
26342666

2667+
private:
2668+
TComputationContext& CompCtx;
2669+
TTestStreamParams& Params;
2670+
ui64 TotalFetches = 0;
2671+
};
2672+
2673+
TTestStreamWrapper(TComputationMutables& mutables, TTestStreamParams& params)
2674+
: TBaseComputation(mutables)
2675+
, Params(params)
2676+
{}
2677+
2678+
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
2679+
return ctx.HolderFactory.Create<TStreamValue>(ctx, Params);
2680+
}
2681+
private:
2682+
void RegisterDependencies() const final {}
2683+
2684+
TTestStreamParams& Params;
2685+
};
2686+
2687+
IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx, TTestStreamParams& params) {
2688+
return new TTestStreamWrapper(ctx.Mutables, params);
2689+
}
2690+
2691+
TComputationNodeFactory GetNodeFactory(TTestStreamParams& leftParams, TTestStreamParams& rightParams) {
2692+
return [&leftParams, &rightParams](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
2693+
if (callable.GetType()->GetName() == LeftStreamName) {
2694+
return WrapTestStream(ctx, leftParams);
2695+
} else if (callable.GetType()->GetName() == RightStreamName) {
2696+
return WrapTestStream(ctx, rightParams);
2697+
}
2698+
return GetBuiltinFactory()(callable, ctx);
2699+
};
2700+
}
2701+
2702+
TRuntimeNode MakeStream(TSetup<false>& setup, bool isRight) {
2703+
TProgramBuilder& pb = *setup.PgmBuilder;
2704+
2705+
TCallableBuilder callableBuilder(*setup.Env, isRight ? RightStreamName : LeftStreamName,
2706+
pb.NewStreamType(
2707+
pb.NewTupleType({
2708+
pb.NewDataType(NUdf::TDataType<ui32>::Id),
2709+
pb.NewDataType(NUdf::TDataType<char*>::Id)
2710+
})
2711+
));
2712+
2713+
return TRuntimeNode(callableBuilder.Build(), false);
2714+
}
2715+
2716+
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinEmptyInputTest) {
2717+
2718+
void RunGraceJoinEmptyCaseTest(EJoinKind joinKind, bool emptyLeft, bool emptyRight) {
2719+
const ui64 streamSize = 5;
2720+
2721+
ui64 leftStreamSize = streamSize;
2722+
ui64 rightStreamSize = streamSize;
2723+
ui64 maxExpectedFetchesFromLeftStream = leftStreamSize + 1;
2724+
ui64 maxExpectedFetchesFromRightStream = rightStreamSize + 1;
2725+
2726+
if (emptyLeft) {
2727+
leftStreamSize = 0;
2728+
if (GraceJoin::ShouldSkipRightIfLeftEmpty(joinKind)) {
2729+
maxExpectedFetchesFromRightStream = 1;
2730+
}
2731+
}
2732+
2733+
if (emptyRight) {
2734+
rightStreamSize = 0;
2735+
if (GraceJoin::ShouldSkipLeftIfRightEmpty(joinKind)) {
2736+
maxExpectedFetchesFromLeftStream = 1;
2737+
}
2738+
}
2739+
2740+
TTestStreamParams leftParams(maxExpectedFetchesFromLeftStream, leftStreamSize);
2741+
TTestStreamParams rightParams(maxExpectedFetchesFromRightStream, rightStreamSize);
2742+
TSetup<false> setup(GetNodeFactory(leftParams, rightParams));
2743+
TProgramBuilder& pb = *setup.PgmBuilder;
2744+
2745+
const auto leftStream = MakeStream(setup, false);
2746+
const auto rightStream = MakeStream(setup, true);
2747+
2748+
const auto resultType = pb.NewFlowType(pb.NewMultiType({
2749+
pb.NewDataType(NUdf::TDataType<char*>::Id),
2750+
pb.NewDataType(NUdf::TDataType<char*>::Id)
2751+
}));
2752+
2753+
const auto joinFlow = pb.GraceJoin(
2754+
pb.ExpandMap(pb.ToFlow(leftStream), [&](TRuntimeNode item) -> TRuntimeNode::TList {
2755+
return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
2756+
}),
2757+
pb.ExpandMap(pb.ToFlow(rightStream), [&](TRuntimeNode item) -> TRuntimeNode::TList {
2758+
return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
2759+
}),
2760+
joinKind,
2761+
{0U}, {0U},
2762+
{1U, 0U}, {1U, 1U},
2763+
resultType);
2764+
2765+
const auto pgmReturn = pb.Collect(pb.NarrowMap(joinFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
2766+
return pb.NewTuple(items);
2767+
}));
2768+
2769+
const auto graph = setup.BuildGraph(pgmReturn);
2770+
const auto iterator = graph->GetValue().GetListIterator();
2771+
2772+
NUdf::TUnboxedValue tuple;
2773+
while (iterator.Next(tuple)) {
2774+
// Consume results if any
2775+
}
2776+
}
2777+
2778+
#define ADD_JOIN_TESTS_FOR_KIND(Kind) \
2779+
Y_UNIT_TEST(Kind##_EmptyLeft) { \
2780+
RunGraceJoinEmptyCaseTest(EJoinKind::Kind, true, false); \
2781+
} \
2782+
Y_UNIT_TEST(Kind##_EmptyRight) { \
2783+
RunGraceJoinEmptyCaseTest(EJoinKind::Kind, false, true); \
2784+
} \
2785+
2786+
ADD_JOIN_TESTS_FOR_KIND(Inner)
2787+
ADD_JOIN_TESTS_FOR_KIND(Left)
2788+
ADD_JOIN_TESTS_FOR_KIND(LeftOnly)
2789+
ADD_JOIN_TESTS_FOR_KIND(LeftSemi)
2790+
ADD_JOIN_TESTS_FOR_KIND(Right)
2791+
ADD_JOIN_TESTS_FOR_KIND(RightOnly)
2792+
ADD_JOIN_TESTS_FOR_KIND(RightSemi)
2793+
ADD_JOIN_TESTS_FOR_KIND(Full)
2794+
ADD_JOIN_TESTS_FOR_KIND(Exclusion)
2795+
2796+
#undef ADD_JOIN_TESTS_FOR_KIND
2797+
}
26352798
}
26362799

26372800
}

0 commit comments

Comments
 (0)