Skip to content

Commit f60c468

Browse files
authored
YQ-3617: fix GROUP BY HOP + AS_TABLE (#9370) (#10250)
1 parent 7b766c9 commit f60c468

File tree

15 files changed

+935
-578
lines changed

15 files changed

+935
-578
lines changed

ydb/library/yql/core/common_opt/yql_co_simple1.cpp

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/library/yql/core/yql_atom_enums.h>
66
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
77
#include <ydb/library/yql/core/yql_join.h>
8+
#include <ydb/library/yql/core/yql_opt_hopping.h>
89
#include <ydb/library/yql/core/yql_opt_utils.h>
910
#include <ydb/library/yql/core/yql_opt_window.h>
1011
#include <ydb/library/yql/core/yql_type_helpers.h>
@@ -3228,6 +3229,99 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext&
32283229
return aggr.Ptr();
32293230
}
32303231

3232+
TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
3233+
const auto pos = aggregate.Pos();
3234+
3235+
NHopping::EnsureNotDistinct(aggregate);
3236+
3237+
const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
3238+
if (!maybeHopTraits) {
3239+
return nullptr;
3240+
}
3241+
const auto hopTraits = *maybeHopTraits;
3242+
3243+
const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
3244+
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);
3245+
3246+
const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
3247+
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
3248+
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
3249+
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
3250+
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
3251+
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
3252+
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
3253+
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);
3254+
3255+
const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
3256+
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
3257+
.KeyExtractor(keyLambda)
3258+
.TimeExtractor(timeExtractorLambda)
3259+
.Hop(hopTraits.Traits.Hop())
3260+
.Interval(hopTraits.Traits.Interval())
3261+
.Delay(hopTraits.Traits.Delay())
3262+
.DataWatermarks(hopTraits.Traits.DataWatermarks())
3263+
.InitHandler(initLambda)
3264+
.UpdateHandler(updateLambda)
3265+
.MergeHandler(mergeLambda)
3266+
.FinishHandler(finishLambda)
3267+
.SaveHandler(saveLambda)
3268+
.LoadHandler(loadLambda)
3269+
.template WatermarkMode<TCoAtom>().Build(ToString(false));
3270+
3271+
return Build<TCoPartitionsByKeys>(ctx, pos)
3272+
.Input(aggregate.Input())
3273+
.KeySelectorLambda(keyLambda)
3274+
.SortDirections<TCoBool>()
3275+
.Literal()
3276+
.Value("true")
3277+
.Build()
3278+
.Build()
3279+
.SortKeySelectorLambda(timeExtractorLambda)
3280+
.ListHandlerLambda()
3281+
.Args(streamArg)
3282+
.template Body<TCoForwardList>()
3283+
.Stream(Build<TCoMap>(ctx, pos)
3284+
.Input(multiHoppingCoreBuilder
3285+
.template Input<TCoIterator>()
3286+
.List(streamArg)
3287+
.Build()
3288+
.Done())
3289+
.Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
3290+
.Done())
3291+
.Build()
3292+
.Build()
3293+
.Done()
3294+
.Ptr();
3295+
}
3296+
3297+
TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
3298+
const auto aggregate = TCoAggregate(node);
3299+
3300+
if (!IsPureIsolatedLambda(*aggregate.Ptr())) {
3301+
return nullptr;
3302+
}
3303+
3304+
if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
3305+
return nullptr;
3306+
}
3307+
3308+
auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
3309+
if (!result) {
3310+
return result;
3311+
}
3312+
3313+
auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
3314+
if (!outputColumnSetting) {
3315+
return result;
3316+
}
3317+
3318+
return Build<TCoExtractMembers>(ctx, aggregate.Pos())
3319+
.Input(result)
3320+
.Members(outputColumnSetting->ChildPtr(1))
3321+
.Done()
3322+
.Ptr();
3323+
}
3324+
32313325
TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
32323326
TVector<ui32> withAssume;
32333327
for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) {
@@ -5007,6 +5101,11 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
50075101
return clean;
50085102
}
50095103

5104+
if (auto hopping = RewriteAsHoppingWindow(node, ctx)) {
5105+
YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow";
5106+
return hopping;
5107+
}
5108+
50105109
return DropReorder<false>(node, ctx);
50115110
};
50125111

ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3270,6 +3270,35 @@ Y_UNIT_TEST_SUITE(TYqlExprConstraints) {
32703270
CheckConstraint<TUniqueConstraintNode>(exprRoot, "LazyList", "");
32713271
CheckConstraint<TDistinctConstraintNode>(exprRoot, "LazyList", "");
32723272
}
3273+
3274+
Y_UNIT_TEST(GroupByHop) {
3275+
const TStringBuf s = R"((
3276+
(let list (AsList
3277+
(AsStruct '('"time" (String '"2024-01-01T00:00:01Z")) '('"user" (Int32 '"1")) '('"data" (Null)))
3278+
(AsStruct '('"time" (String '"2024-01-01T00:00:02Z")) '('"user" (Int32 '"1")) '('"data" (Null)))
3279+
(AsStruct '('"time" (String '"2024-01-01T00:00:03Z")) '('"user" (Int32 '"1")) '('"data" (Null)))
3280+
))
3281+
(let input (FlatMap list (lambda '(row) (Just (AsStruct '('"data" (Member row '"data")) '('group0 (AsList (Member row '"user"))) '('"time" (Member row '"time")) '('"user" (Member row '"user")))))))
3282+
(let keySelector (lambda '(row) '((StablePickle (Member row '"data")) (StablePickle (Member row 'group0)))))
3283+
(let sortKeySelector (lambda '(row) (SafeCast (Member row '"time") (OptionalType (DataType 'Timestamp)))))
3284+
(let res (PartitionsByKeys input keySelector (Bool 'true) sortKeySelector (lambda '(row) (block '(
3285+
(let interval (Interval '1000000))
3286+
(let map (lambda '(item) (AsStruct)))
3287+
(let reduce (lambda '(lhs rhs) (AsStruct)))
3288+
(let hopping (MultiHoppingCore (Iterator row) keySelector sortKeySelector interval interval interval 'true map reduce map map reduce (lambda '(key state time) (AsStruct '('_yql_time time) '('"data" (Nth key '"0")) '('group0 (Nth key '"1")))) '"0"))
3289+
(return (ForwardList (FlatMap hopping (lambda '(row) (Just (AsStruct '('_yql_time (Member row '_yql_time)) '('"data" (Unpickle (NullType) (Member row '"data"))) '('group0 (Unpickle (ListType (DataType 'Int32)) (Member row 'group0)))))))))
3290+
)))))
3291+
3292+
(let res_sink (DataSink 'yt (quote plato)))
3293+
(let world (Write! world res_sink (Key '('table (String 'Output))) res '('('mode 'renew))))
3294+
(return (Commit! world res_sink))
3295+
))";
3296+
3297+
TExprContext exprCtx;
3298+
const auto exprRoot = ParseAndAnnotate(s, exprCtx);
3299+
CheckConstraint<TDistinctConstraintNode>(exprRoot, "PartitionsByKeys", "Distinct((data,group0))");
3300+
CheckConstraint<TUniqueConstraintNode>(exprRoot, "PartitionsByKeys", "Unique((data,group0))");
3301+
}
32733302
}
32743303

32753304
} // namespace NYql

ydb/library/yql/core/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ SRCS(
2828
yql_join.cpp
2929
yql_join.h
3030
yql_library_compiler.cpp
31+
yql_opt_hopping.cpp
3132
yql_opt_match_recognize.cpp
3233
yql_opt_match_recognize.h
3334
yql_opt_proposed_by_data.cpp

ydb/library/yql/core/yql_expr_constraint.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,9 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
241241
Functions["ReplicateScalars"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
242242
Functions["BlockMergeFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
243243
Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
244+
Functions["MultiHoppingCore"] = &TCallableConstraintTransformer::MultiHoppingCoreWrap;
245+
Functions["StablePickle"] = &TCallableConstraintTransformer::FromFirst<TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode, TPartOfChoppedConstraintNode, TVarIndexConstraintNode>;
246+
Functions["Unpickle"] = &TCallableConstraintTransformer::FromSecond<TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode, TPartOfChoppedConstraintNode, TVarIndexConstraintNode>;
244247
}
245248

246249
std::optional<IGraphTransformer::TStatus> ProcessCore(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
@@ -2892,6 +2895,26 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
28922895

28932896
return TStatus::Ok;
28942897
}
2898+
2899+
TStatus MultiHoppingCoreWrap(const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) const {
2900+
if (const auto status = UpdateAllChildLambdasConstraints(*input); status != TStatus::Ok) {
2901+
return status;
2902+
}
2903+
2904+
TExprNode::TPtr keySelectorLambda = input->Child(TCoMultiHoppingCore::idx_KeyExtractor);
2905+
const auto keys = GetPathsToKeys(keySelectorLambda->Tail(), keySelectorLambda->Head().Head());
2906+
std::vector<std::string_view> columns(keys.size());
2907+
std::transform(keys.begin(), keys.end(), columns.begin(), [](const TPartOfConstraintBase::TPathType& path) -> std::string_view {
2908+
return path.front();
2909+
});
2910+
if (!columns.empty()) {
2911+
input->AddConstraint(ctx.MakeConstraint<TUniqueConstraintNode>(columns));
2912+
input->AddConstraint(ctx.MakeConstraint<TDistinctConstraintNode>(columns));
2913+
}
2914+
2915+
return TStatus::Ok;
2916+
}
2917+
28952918
private:
28962919
template <class TConstraintContainer>
28972920
static void CopyExcept(TConstraintContainer& dst, const TConstraintContainer& from, const TSet<TStringBuf>& except) {

0 commit comments

Comments
 (0)