Skip to content

Commit 5bf1ff7

Browse files
[GraceJoin] Skip reading input when output is guaranteed to be empty (#20197)
2 parents ad04714 + b672154 commit 5bf1ff7

File tree

3 files changed

+231
-4
lines changed

3 files changed

+231
-4
lines changed

yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,14 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
630630
}
631631

632632
private:
633+
bool CanSkipRightOnLeftFinished() const {
634+
return !IsSelfJoin_ && LeftPacker->TuplesPacked == 0 && GraceJoin::ShouldSkipRightIfLeftEmpty(JoinKind);
635+
}
636+
637+
bool CanSkipLeftOnRightFinished() const {
638+
return !IsSelfJoin_ && RightPacker->TuplesPacked == 0 && GraceJoin::ShouldSkipLeftIfRightEmpty(JoinKind);
639+
}
640+
633641
EOperatingMode GetMode() const {
634642
return Mode;
635643
}
@@ -755,6 +763,12 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
755763
}
756764
}
757765

766+
if (resultLeft == EFetchResult::Finish && CanSkipRightOnLeftFinished() ||
767+
resultRight == EFetchResult::Finish && CanSkipLeftOnRightFinished()) {
768+
IsEarlyExitDueToEmptyInput = true;
769+
return EFetchResult::Finish;
770+
}
771+
758772
if (resultLeft == EFetchResult::Yield || resultRight == EFetchResult::Yield) {
759773
return EFetchResult::Yield;
760774
}
@@ -844,6 +858,11 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
844858
}
845859

846860
auto isYield = FetchAndPackData(ctx, output);
861+
if (IsEarlyExitDueToEmptyInput) {
862+
*HaveMoreLeftRows = false;
863+
*HaveMoreRightRows = false;
864+
return EFetchResult::Finish;
865+
}
847866
if (isYield == EFetchResult::One)
848867
return isYield;
849868
if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
@@ -1044,6 +1063,7 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
10441063
const bool IsSpillingAllowed;
10451064

10461065
bool IsSpillingFinalized = false;
1066+
bool IsEarlyExitDueToEmptyInput = false;
10471067

10481068
NYql::NUdf::TCounter CounterOutputRows_;
10491069
ui32 SpilledBucketsJoinOrderCurrentIndex = 0;

yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,50 @@ namespace NKikimr {
1010
namespace NMiniKQL {
1111
namespace GraceJoin {
1212

13+
// Determines whether the right input can be safely skipped when the left input is empty.
14+
// For certain join kinds, the result will always be empty if the left side is empty,
15+
// regardless of the content of the right side. In such cases, it's safe to avoid reading the right input entirely.
16+
constexpr bool ShouldSkipRightIfLeftEmpty(EJoinKind kind) {
17+
switch (kind) {
18+
case EJoinKind::Inner:
19+
case EJoinKind::LeftOnly:
20+
case EJoinKind::LeftSemi:
21+
case EJoinKind::RightSemi:
22+
return true;
23+
case EJoinKind::RightOnly:
24+
case EJoinKind::Left:
25+
case EJoinKind::Right:
26+
case EJoinKind::Exclusion:
27+
case EJoinKind::Full:
28+
case EJoinKind::SemiMask:
29+
case EJoinKind::SemiSide:
30+
case EJoinKind::Cross:
31+
return false;
32+
}
33+
}
34+
35+
// Determines whether the left input can be safely skipped when the right input is empty.
36+
// For certain join kinds, the result will always be empty if the right side is empty,
37+
// regardless of the content of the left side. In such cases, it's safe to avoid reading the left input entirely.
38+
constexpr bool ShouldSkipLeftIfRightEmpty(EJoinKind kind) {
39+
switch (kind) {
40+
case EJoinKind::Inner:
41+
case EJoinKind::RightOnly:
42+
case EJoinKind::RightSemi:
43+
case EJoinKind::LeftSemi:
44+
return true;
45+
case EJoinKind::Min:
46+
case EJoinKind::Left:
47+
case EJoinKind::Right:
48+
case EJoinKind::Exclusion:
49+
case EJoinKind::Full:
50+
case EJoinKind::SemiMask:
51+
case EJoinKind::SemiSide:
52+
case EJoinKind::Cross:
53+
return false;
54+
}
55+
}
56+
1357
class TTableBucketSpiller;
1458
#define GRACEJOIN_DEBUG DEBUG
1559
#define GRACEJOIN_TRACE TRACE

yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp

Lines changed: 167 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
#include "mkql_computation_node_ut.h"
22
#include <yql/essentials/minikql/mkql_runtime_version.h>
33
#include <yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h>
4+
#include <yql/essentials/minikql/mkql_string_util.h>
45

56
#include <yql/essentials/minikql/computation/mock_spiller_factory_ut.h>
67

7-
#include <chrono>
8-
#include <iostream>
98
#include <cstring>
109
#include <vector>
1110
#include <cassert>
@@ -17,8 +16,6 @@
1716
#include <util/stream/null.h>
1817
#include <util/system/mem_info.h>
1918

20-
#include <cstdint>
21-
2219
namespace NKikimr {
2320
namespace NMiniKQL {
2421

@@ -2633,7 +2630,173 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
26332630

26342631
}
26352632

2633+
constexpr std::string_view LeftStreamName = "LeftTestStream";
2634+
constexpr std::string_view RightStreamName = "RightTestStream";
2635+
2636+
struct TTestStreamParams {
2637+
ui64 MaxAllowedNumberOfFetches;
2638+
ui64 StreamSize;
2639+
};
2640+
2641+
class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper> {
2642+
using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
2643+
public:
2644+
class TStreamValue : public TComputationValue<TStreamValue> {
2645+
public:
2646+
using TBase = TComputationValue<TStreamValue>;
2647+
2648+
TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, TTestStreamParams& params)
2649+
: TBase(memInfo), CompCtx(compCtx), Params(params)
2650+
{}
2651+
private:
2652+
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
2653+
++TotalFetches;
2654+
2655+
UNIT_ASSERT_LE(TotalFetches, Params.MaxAllowedNumberOfFetches);
2656+
2657+
if (TotalFetches > Params.StreamSize) {
2658+
return NUdf::EFetchStatus::Finish;
2659+
}
2660+
2661+
NUdf::TUnboxedValue* items = nullptr;
2662+
result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items);
2663+
items[0] = NUdf::TUnboxedValuePod(TotalFetches);
2664+
items[1] = MakeString(ToString(TotalFetches) * 5);
2665+
2666+
return NUdf::EFetchStatus::Ok;
2667+
}
26362668

2669+
private:
2670+
TComputationContext& CompCtx;
2671+
TTestStreamParams& Params;
2672+
ui64 TotalFetches = 0;
2673+
};
2674+
2675+
TTestStreamWrapper(TComputationMutables& mutables, TTestStreamParams& params)
2676+
: TBaseComputation(mutables)
2677+
, Params(params)
2678+
{}
2679+
2680+
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
2681+
return ctx.HolderFactory.Create<TStreamValue>(ctx, Params);
2682+
}
2683+
private:
2684+
void RegisterDependencies() const final {}
2685+
2686+
TTestStreamParams& Params;
2687+
};
2688+
2689+
IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx, TTestStreamParams& params) {
2690+
return new TTestStreamWrapper(ctx.Mutables, params);
2691+
}
2692+
2693+
TComputationNodeFactory GetNodeFactory(TTestStreamParams& leftParams, TTestStreamParams& rightParams) {
2694+
return [&leftParams, &rightParams](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
2695+
if (callable.GetType()->GetName() == LeftStreamName) {
2696+
return WrapTestStream(ctx, leftParams);
2697+
} else if (callable.GetType()->GetName() == RightStreamName) {
2698+
return WrapTestStream(ctx, rightParams);
2699+
}
2700+
return GetBuiltinFactory()(callable, ctx);
2701+
};
2702+
}
2703+
2704+
TRuntimeNode MakeStream(TSetup<false>& setup, bool isRight) {
2705+
TProgramBuilder& pb = *setup.PgmBuilder;
2706+
2707+
TCallableBuilder callableBuilder(*setup.Env, isRight ? RightStreamName : LeftStreamName,
2708+
pb.NewStreamType(
2709+
pb.NewTupleType({
2710+
pb.NewDataType(NUdf::TDataType<ui32>::Id),
2711+
pb.NewDataType(NUdf::TDataType<char*>::Id)
2712+
})
2713+
));
2714+
2715+
return TRuntimeNode(callableBuilder.Build(), false);
2716+
}
2717+
2718+
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinEmptyInputTest) {
2719+
2720+
void RunGraceJoinEmptyCaseTest(EJoinKind joinKind, bool emptyLeft, bool emptyRight) {
2721+
const ui64 streamSize = 5;
2722+
2723+
ui64 leftStreamSize = streamSize;
2724+
ui64 rightStreamSize = streamSize;
2725+
ui64 maxExpectedFetchesFromLeftStream = leftStreamSize + 1;
2726+
ui64 maxExpectedFetchesFromRightStream = rightStreamSize + 1;
2727+
2728+
if (emptyLeft) {
2729+
leftStreamSize = 0;
2730+
if (GraceJoin::ShouldSkipRightIfLeftEmpty(joinKind)) {
2731+
maxExpectedFetchesFromRightStream = 1;
2732+
}
2733+
}
2734+
2735+
if (emptyRight) {
2736+
rightStreamSize = 0;
2737+
if (GraceJoin::ShouldSkipLeftIfRightEmpty(joinKind)) {
2738+
maxExpectedFetchesFromLeftStream = 1;
2739+
}
2740+
}
2741+
2742+
TTestStreamParams leftParams(maxExpectedFetchesFromLeftStream, leftStreamSize);
2743+
TTestStreamParams rightParams(maxExpectedFetchesFromRightStream, rightStreamSize);
2744+
TSetup<false> setup(GetNodeFactory(leftParams, rightParams));
2745+
TProgramBuilder& pb = *setup.PgmBuilder;
2746+
2747+
const auto leftStream = MakeStream(setup, false);
2748+
const auto rightStream = MakeStream(setup, true);
2749+
2750+
const auto resultType = pb.NewFlowType(pb.NewMultiType({
2751+
pb.NewDataType(NUdf::TDataType<char*>::Id),
2752+
pb.NewDataType(NUdf::TDataType<char*>::Id)
2753+
}));
2754+
2755+
const auto joinFlow = pb.GraceJoin(
2756+
pb.ExpandMap(pb.ToFlow(leftStream), [&](TRuntimeNode item) -> TRuntimeNode::TList {
2757+
return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
2758+
}),
2759+
pb.ExpandMap(pb.ToFlow(rightStream), [&](TRuntimeNode item) -> TRuntimeNode::TList {
2760+
return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
2761+
}),
2762+
joinKind,
2763+
{0U}, {0U},
2764+
{1U, 0U}, {1U, 1U},
2765+
resultType);
2766+
2767+
const auto pgmReturn = pb.Collect(pb.NarrowMap(joinFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
2768+
return pb.NewTuple(items);
2769+
}));
2770+
2771+
const auto graph = setup.BuildGraph(pgmReturn);
2772+
const auto iterator = graph->GetValue().GetListIterator();
2773+
2774+
NUdf::TUnboxedValue tuple;
2775+
while (iterator.Next(tuple)) {
2776+
// Consume results if any
2777+
}
2778+
}
2779+
2780+
#define ADD_JOIN_TESTS_FOR_KIND(Kind) \
2781+
Y_UNIT_TEST(Kind##_EmptyLeft) { \
2782+
RunGraceJoinEmptyCaseTest(EJoinKind::Kind, true, false); \
2783+
} \
2784+
Y_UNIT_TEST(Kind##_EmptyRight) { \
2785+
RunGraceJoinEmptyCaseTest(EJoinKind::Kind, false, true); \
2786+
} \
2787+
2788+
ADD_JOIN_TESTS_FOR_KIND(Inner)
2789+
ADD_JOIN_TESTS_FOR_KIND(Left)
2790+
ADD_JOIN_TESTS_FOR_KIND(LeftOnly)
2791+
ADD_JOIN_TESTS_FOR_KIND(LeftSemi)
2792+
ADD_JOIN_TESTS_FOR_KIND(Right)
2793+
ADD_JOIN_TESTS_FOR_KIND(RightOnly)
2794+
ADD_JOIN_TESTS_FOR_KIND(RightSemi)
2795+
ADD_JOIN_TESTS_FOR_KIND(Full)
2796+
ADD_JOIN_TESTS_FOR_KIND(Exclusion)
2797+
2798+
#undef ADD_JOIN_TESTS_FOR_KIND
2799+
}
26372800
}
26382801

26392802
}

0 commit comments

Comments
 (0)