Skip to content

Commit 7a861b9

Browse files
authored
YQ-3617: fix GROUP BY HOP + AS_TABLE (#9370)
1 parent ec02547 commit 7a861b9

File tree

15 files changed

+935
-578
lines changed

15 files changed

+935
-578
lines changed

ydb/library/yql/core/common_opt/yql_co_simple1.cpp

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/library/yql/core/yql_atom_enums.h>
66
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
77
#include <ydb/library/yql/core/yql_join.h>
8+
#include <ydb/library/yql/core/yql_opt_hopping.h>
89
#include <ydb/library/yql/core/yql_opt_utils.h>
910
#include <ydb/library/yql/core/yql_opt_window.h>
1011
#include <ydb/library/yql/core/yql_type_helpers.h>
@@ -3301,6 +3302,99 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext&
33013302
return aggr.Ptr();
33023303
}
33033304

3305+
TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
3306+
const auto pos = aggregate.Pos();
3307+
3308+
NHopping::EnsureNotDistinct(aggregate);
3309+
3310+
const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
3311+
if (!maybeHopTraits) {
3312+
return nullptr;
3313+
}
3314+
const auto hopTraits = *maybeHopTraits;
3315+
3316+
const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
3317+
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);
3318+
3319+
const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
3320+
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
3321+
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
3322+
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
3323+
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
3324+
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
3325+
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
3326+
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);
3327+
3328+
const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
3329+
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
3330+
.KeyExtractor(keyLambda)
3331+
.TimeExtractor(timeExtractorLambda)
3332+
.Hop(hopTraits.Traits.Hop())
3333+
.Interval(hopTraits.Traits.Interval())
3334+
.Delay(hopTraits.Traits.Delay())
3335+
.DataWatermarks(hopTraits.Traits.DataWatermarks())
3336+
.InitHandler(initLambda)
3337+
.UpdateHandler(updateLambda)
3338+
.MergeHandler(mergeLambda)
3339+
.FinishHandler(finishLambda)
3340+
.SaveHandler(saveLambda)
3341+
.LoadHandler(loadLambda)
3342+
.template WatermarkMode<TCoAtom>().Build(ToString(false));
3343+
3344+
return Build<TCoPartitionsByKeys>(ctx, pos)
3345+
.Input(aggregate.Input())
3346+
.KeySelectorLambda(keyLambda)
3347+
.SortDirections<TCoBool>()
3348+
.Literal()
3349+
.Value("true")
3350+
.Build()
3351+
.Build()
3352+
.SortKeySelectorLambda(timeExtractorLambda)
3353+
.ListHandlerLambda()
3354+
.Args(streamArg)
3355+
.template Body<TCoForwardList>()
3356+
.Stream(Build<TCoMap>(ctx, pos)
3357+
.Input(multiHoppingCoreBuilder
3358+
.template Input<TCoIterator>()
3359+
.List(streamArg)
3360+
.Build()
3361+
.Done())
3362+
.Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
3363+
.Done())
3364+
.Build()
3365+
.Build()
3366+
.Done()
3367+
.Ptr();
3368+
}
3369+
3370+
TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
3371+
const auto aggregate = TCoAggregate(node);
3372+
3373+
if (!IsPureIsolatedLambda(*aggregate.Ptr())) {
3374+
return nullptr;
3375+
}
3376+
3377+
if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
3378+
return nullptr;
3379+
}
3380+
3381+
auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
3382+
if (!result) {
3383+
return result;
3384+
}
3385+
3386+
auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
3387+
if (!outputColumnSetting) {
3388+
return result;
3389+
}
3390+
3391+
return Build<TCoExtractMembers>(ctx, aggregate.Pos())
3392+
.Input(result)
3393+
.Members(outputColumnSetting->ChildPtr(1))
3394+
.Done()
3395+
.Ptr();
3396+
}
3397+
33043398
TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
33053399
TVector<ui32> withAssume;
33063400
for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) {
@@ -5080,6 +5174,11 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
50805174
return clean;
50815175
}
50825176

5177+
if (auto hopping = RewriteAsHoppingWindow(node, ctx)) {
5178+
YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow";
5179+
return hopping;
5180+
}
5181+
50835182
return DropReorder<false>(node, ctx);
50845183
};
50855184

ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3270,6 +3270,35 @@ Y_UNIT_TEST_SUITE(TYqlExprConstraints) {
32703270
CheckConstraint<TUniqueConstraintNode>(exprRoot, "LazyList", "");
32713271
CheckConstraint<TDistinctConstraintNode>(exprRoot, "LazyList", "");
32723272
}
3273+
3274+
Y_UNIT_TEST(GroupByHop) {
3275+
const TStringBuf s = R"((
3276+
(let list (AsList
3277+
(AsStruct '('"time" (String '"2024-01-01T00:00:01Z")) '('"user" (Int32 '"1")) '('"data" (Null)))
3278+
(AsStruct '('"time" (String '"2024-01-01T00:00:02Z")) '('"user" (Int32 '"1")) '('"data" (Null)))
3279+
(AsStruct '('"time" (String '"2024-01-01T00:00:03Z")) '('"user" (Int32 '"1")) '('"data" (Null)))
3280+
))
3281+
(let input (FlatMap list (lambda '(row) (Just (AsStruct '('"data" (Member row '"data")) '('group0 (AsList (Member row '"user"))) '('"time" (Member row '"time")) '('"user" (Member row '"user")))))))
3282+
(let keySelector (lambda '(row) '((StablePickle (Member row '"data")) (StablePickle (Member row 'group0)))))
3283+
(let sortKeySelector (lambda '(row) (SafeCast (Member row '"time") (OptionalType (DataType 'Timestamp)))))
3284+
(let res (PartitionsByKeys input keySelector (Bool 'true) sortKeySelector (lambda '(row) (block '(
3285+
(let interval (Interval '1000000))
3286+
(let map (lambda '(item) (AsStruct)))
3287+
(let reduce (lambda '(lhs rhs) (AsStruct)))
3288+
(let hopping (MultiHoppingCore (Iterator row) keySelector sortKeySelector interval interval interval 'true map reduce map map reduce (lambda '(key state time) (AsStruct '('_yql_time time) '('"data" (Nth key '"0")) '('group0 (Nth key '"1")))) '"0"))
3289+
(return (ForwardList (FlatMap hopping (lambda '(row) (Just (AsStruct '('_yql_time (Member row '_yql_time)) '('"data" (Unpickle (NullType) (Member row '"data"))) '('group0 (Unpickle (ListType (DataType 'Int32)) (Member row 'group0)))))))))
3290+
)))))
3291+
3292+
(let res_sink (DataSink 'yt (quote plato)))
3293+
(let world (Write! world res_sink (Key '('table (String 'Output))) res '('('mode 'renew))))
3294+
(return (Commit! world res_sink))
3295+
))";
3296+
3297+
TExprContext exprCtx;
3298+
const auto exprRoot = ParseAndAnnotate(s, exprCtx);
3299+
CheckConstraint<TDistinctConstraintNode>(exprRoot, "PartitionsByKeys", "Distinct((data,group0))");
3300+
CheckConstraint<TUniqueConstraintNode>(exprRoot, "PartitionsByKeys", "Unique((data,group0))");
3301+
}
32733302
}
32743303

32753304
} // namespace NYql

ydb/library/yql/core/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ SRCS(
2828
yql_join.cpp
2929
yql_join.h
3030
yql_library_compiler.cpp
31+
yql_opt_hopping.cpp
3132
yql_opt_match_recognize.cpp
3233
yql_opt_match_recognize.h
3334
yql_opt_proposed_by_data.cpp

ydb/library/yql/core/yql_expr_constraint.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,9 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
244244
Functions["ReplicateScalars"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
245245
Functions["BlockMergeFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
246246
Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
247+
Functions["MultiHoppingCore"] = &TCallableConstraintTransformer::MultiHoppingCoreWrap;
248+
Functions["StablePickle"] = &TCallableConstraintTransformer::FromFirst<TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode, TPartOfChoppedConstraintNode, TVarIndexConstraintNode>;
249+
Functions["Unpickle"] = &TCallableConstraintTransformer::FromSecond<TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode, TPartOfChoppedConstraintNode, TVarIndexConstraintNode>;
247250
}
248251

249252
std::optional<IGraphTransformer::TStatus> ProcessCore(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
@@ -2924,6 +2927,26 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
29242927

29252928
return TStatus::Ok;
29262929
}
2930+
2931+
TStatus MultiHoppingCoreWrap(const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) const {
2932+
if (const auto status = UpdateAllChildLambdasConstraints(*input); status != TStatus::Ok) {
2933+
return status;
2934+
}
2935+
2936+
TExprNode::TPtr keySelectorLambda = input->Child(TCoMultiHoppingCore::idx_KeyExtractor);
2937+
const auto keys = GetPathsToKeys(keySelectorLambda->Tail(), keySelectorLambda->Head().Head());
2938+
std::vector<std::string_view> columns(keys.size());
2939+
std::transform(keys.begin(), keys.end(), columns.begin(), [](const TPartOfConstraintBase::TPathType& path) -> std::string_view {
2940+
return path.front();
2941+
});
2942+
if (!columns.empty()) {
2943+
input->AddConstraint(ctx.MakeConstraint<TUniqueConstraintNode>(columns));
2944+
input->AddConstraint(ctx.MakeConstraint<TDistinctConstraintNode>(columns));
2945+
}
2946+
2947+
return TStatus::Ok;
2948+
}
2949+
29272950
private:
29282951
template <class TConstraintContainer>
29292952
static void CopyExcept(TConstraintContainer& dst, const TConstraintContainer& from, const TSet<TStringBuf>& except) {

0 commit comments

Comments
 (0)