Skip to content

Commit 7120c84

Browse files
Return no-yield guaranties to Collect (#7219)
1 parent 367b417 commit 7120c84

File tree

2 files changed

+38
-60
lines changed

2 files changed

+38
-60
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@ using TBaseComputation = TMutableCodegeneratorRootNode<TCollectFlowWrapper>;
2020
if (item.IsFinish()) {
2121
return list.Release();
2222
}
23-
24-
if (!item.IsYield()) {
25-
list = ctx.HolderFactory.Append(list.Release(), item.Release());
26-
}
23+
MKQL_ENSURE(!item.IsYield(), "Unexpected flow status!");
24+
list = ctx.HolderFactory.Append(list.Release(), item.Release());
2725
}
2826
}
2927
#ifndef MKQL_DISABLE_CODEGEN

ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp

Lines changed: 36 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,24 @@ TRuntimeNode WideLastCombiner(TProgramBuilder& pb, TRuntimeNode flow, const TPro
126126
pb.WideLastCombiner(flow, extractor, init, update, finish);
127127
}
128128

129+
void CheckIfStreamHasExpectedStringValues(const NUdf::TUnboxedValue& streamValue, std::unordered_set<TString>& expected) {
130+
NUdf::TUnboxedValue item;
131+
NUdf::EFetchStatus fetchStatus;
132+
while (!expected.empty()) {
133+
fetchStatus = streamValue.Fetch(item);
134+
UNIT_ASSERT_UNEQUAL(fetchStatus, NUdf::EFetchStatus::Finish);
135+
if (fetchStatus == NYql::NUdf::EFetchStatus::Yield) continue;
136+
137+
const auto actual = TString(item.AsStringRef());
138+
139+
auto it = expected.find(actual);
140+
UNIT_ASSERT(it != expected.end());
141+
expected.erase(it);
142+
}
143+
fetchStatus = streamValue.Fetch(item);
144+
UNIT_ASSERT_EQUAL(fetchStatus, NUdf::EFetchStatus::Finish);
145+
}
146+
129147
} // unnamed
130148

131149
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
@@ -1049,7 +1067,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
10491067

10501068
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
10511069

1052-
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
1070+
const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
10531071
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
10541072
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
10551073
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1076,26 +1094,16 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
10761094
if (SPILLING) {
10771095
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
10781096
}
1079-
const auto iterator = graph->GetValue().GetListIterator();
10801097

1098+
const auto streamVal = graph->GetValue();
10811099
std::unordered_set<TString> expected {
10821100
"key one",
10831101
"very long value 2 / key two",
10841102
"very long key one",
10851103
"very long value 8 / very long value 7 / very long value 6"
10861104
};
10871105

1088-
NUdf::TUnboxedValue item;
1089-
while (!expected.empty()) {
1090-
UNIT_ASSERT(iterator.Next(item));
1091-
const auto actual = TString(item.AsStringRef());
1092-
1093-
auto it = expected.find(actual);
1094-
UNIT_ASSERT(it != expected.end());
1095-
expected.erase(it);
1096-
}
1097-
UNIT_ASSERT(!iterator.Next(item));
1098-
UNIT_ASSERT(!iterator.Next(item));
1106+
CheckIfStreamHasExpectedStringValues(streamVal, expected);
10991107
}
11001108

11011109
Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsPasstroughtRefCounting) {
@@ -1140,7 +1148,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
11401148

11411149
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
11421150

1143-
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
1151+
const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
11441152
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
11451153
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
11461154
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1166,26 +1174,16 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
11661174
if (SPILLING) {
11671175
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
11681176
}
1169-
const auto iterator = graph->GetValue().GetListIterator();
11701177

1178+
const auto streamVal = graph->GetValue();
11711179
std::unordered_set<TString> expected {
11721180
"very long value 1 / key one / very long value 1 / key one",
11731181
"very long value 3 / key two / very long value 2 / key two",
11741182
"very long value 4 / very long key one / very long value 4 / very long key one",
11751183
"very long value 9 / very long key two / very long value 5 / very long key two"
11761184
};
11771185

1178-
NUdf::TUnboxedValue item;
1179-
while (!expected.empty()) {
1180-
UNIT_ASSERT(iterator.Next(item));
1181-
const auto actual = TString(item.AsStringRef());
1182-
1183-
auto it = expected.find(actual);
1184-
UNIT_ASSERT(it != expected.end());
1185-
expected.erase(it);
1186-
}
1187-
UNIT_ASSERT(!iterator.Next(item));
1188-
UNIT_ASSERT(!iterator.Next(item));
1186+
CheckIfStreamHasExpectedStringValues(streamVal, expected);
11891187
}
11901188

11911189
Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedInput) {
@@ -1230,7 +1228,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
12301228

12311229
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");
12321230

1233-
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
1231+
const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
12341232
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }),
12351233
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
12361234
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1257,23 +1255,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
12571255
if (SPILLING) {
12581256
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
12591257
}
1258+
1259+
const auto streamVal = graph->GetValue();
12601260
std::unordered_set<TString> expected {
12611261
"key one / value 2 / value 1 / value 5 / value 4",
12621262
"key two / value 4 / value 3 / value 3 / value 2"
12631263
};
12641264

1265-
const auto iterator = graph->GetValue().GetListIterator();
1266-
NUdf::TUnboxedValue item;
1267-
while (!expected.empty()) {
1268-
UNIT_ASSERT(iterator.Next(item));
1269-
const auto actual = TString(item.AsStringRef());
1270-
1271-
auto it = expected.find(actual);
1272-
UNIT_ASSERT(it != expected.end());
1273-
expected.erase(it);
1274-
}
1275-
UNIT_ASSERT(!iterator.Next(item));
1276-
UNIT_ASSERT(!iterator.Next(item));
1265+
CheckIfStreamHasExpectedStringValues(streamVal, expected);
12771266
}
12781267

12791268
Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedOutput) {
@@ -1315,7 +1304,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
13151304

13161305
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");
13171306

1318-
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
1307+
const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
13191308
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }),
13201309
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
13211310
[&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1334,23 +1323,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
13341323
if (SPILLING) {
13351324
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
13361325
}
1326+
1327+
const auto streamVal = graph->GetValue();
13371328
std::unordered_set<TString> expected {
13381329
"key one: value 1, value 4, value 5, value 1, value 2",
13391330
"key two: value 2, value 3, value 3, value 4"
13401331
};
13411332

1342-
const auto iterator = graph->GetValue().GetListIterator();
1343-
NUdf::TUnboxedValue item;
1344-
while (!expected.empty()) {
1345-
UNIT_ASSERT(iterator.Next(item));
1346-
const auto actual = TString(item.AsStringRef());
1347-
1348-
auto it = expected.find(actual);
1349-
UNIT_ASSERT(it != expected.end());
1350-
expected.erase(it);
1351-
}
1352-
UNIT_ASSERT(!iterator.Next(item));
1353-
UNIT_ASSERT(!iterator.Next(item));
1333+
CheckIfStreamHasExpectedStringValues(streamVal, expected);
13541334
}
13551335

13561336
Y_UNIT_TEST_LLVM_SPILLING(TestThinAllLambdas) {
@@ -1366,7 +1346,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
13661346

13671347
const auto list = pb.NewList(tupleType, {data, data, data, data});
13681348

1369-
const auto pgmReturn = pb.Collect(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
1349+
const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
13701350
[](TRuntimeNode) -> TRuntimeNode::TList { return {}; }),
13711351
[](TRuntimeNode::TList items) { return items; },
13721352
[](TRuntimeNode::TList, TRuntimeNode::TList items) { return items; },
@@ -1376,10 +1356,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
13761356
));
13771357

13781358
const auto graph = setup.BuildGraph(pgmReturn);
1379-
const auto iterator = graph->GetValue().GetListIterator();
1359+
const auto streamVal = graph->GetValue();
13801360
NUdf::TUnboxedValue item;
1381-
UNIT_ASSERT(!iterator.Next(item));
1382-
UNIT_ASSERT(!iterator.Next(item));
1361+
const auto fetchStatus = streamVal.Fetch(item);
1362+
UNIT_ASSERT_EQUAL(fetchStatus, NUdf::EFetchStatus::Finish);
13831363
}
13841364
}
13851365

0 commit comments

Comments
 (0)