Skip to content

Commit 5a61c83

Browse files
loochekblinkov
authored andcommitted
YT block input fixes
commit_hash:b478ef6bc7dcc27a33b84d5fd31b4c1876057f1e
1 parent 6fecba8 commit 5a61c83

File tree

5 files changed

+73
-7
lines changed

5 files changed

+73
-7
lines changed

yt/yql/providers/yt/codec/yt_codec_io.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,14 +1482,14 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
14821482
YQL_ENSURE(!Chunks_.empty());
14831483
}
14841484

1485-
auto& inputFields = SpecsCache_.GetSpecs().Inputs[TableIndex_]->FieldsVec;
1485+
auto& decoder = *Specs_.Inputs[TableIndex_];
14861486
Row_ = SpecsCache_.NewRow(TableIndex_, items, true);
14871487

14881488
auto& [chunkRowIndex, chunkLen, chunk] = Chunks_.front();
1489-
for (size_t i = 0; i < inputFields.size(); i++) {
1490-
items[inputFields[i].StructIndex] = SpecsCache_.GetHolderFactory().CreateArrowBlock(std::move(chunk[i]));
1489+
for (size_t i = 0; i < decoder.StructSize; i++) {
1490+
items[i] = SpecsCache_.GetHolderFactory().CreateArrowBlock(std::move(chunk[i]));
14911491
}
1492-
items[inputFields.size()] = SpecsCache_.GetHolderFactory().CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(chunkLen)));
1492+
items[decoder.StructSize] = SpecsCache_.GetHolderFactory().CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(chunkLen)));
14931493
RowIndex_ = chunkRowIndex;
14941494

14951495
Chunks_.pop_front();
@@ -1539,7 +1539,7 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
15391539
YQL_ENSURE(rowIndices || decoder.Dynamic || Specs_.IsTableContent_);
15401540

15411541
arrow::compute::ExecContext execContext(Pool_);
1542-
std::vector<arrow::Datum> convertedBatch;
1542+
std::vector<arrow::Datum> convertedBatch(decoder.StructSize);
15431543
for (size_t i = 0; i < inputFields.size(); i++) {
15441544
auto batchColumn = batch->GetColumnByName(inputFields[i].Name);
15451545
if (!batchColumn) {
@@ -1565,15 +1565,18 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
15651565
}
15661566
} else if (decoder.FillSysColumnIndex == inputFields[i].StructIndex) {
15671567
convertedColumn = ARROW_RESULT(arrow::MakeArrayFromScalar(arrow::UInt32Scalar(TableIndex_), batch->num_rows()));
1568+
} else if (inputFields[i].StructIndex == Max<ui32>()) {
1569+
// Input field won't appear in the result
1570+
continue;
15681571
} else {
15691572
YQL_ENSURE(false, "unexpected column: " << inputFields[i].Name);
15701573
}
15711574

1572-
convertedBatch.emplace_back(convertedColumn);
1575+
convertedBatch[inputFields[i].StructIndex] = std::move(convertedColumn);
15731576
continue;
15741577
}
15751578

1576-
convertedBatch.emplace_back(ColumnConverters_[i]->Convert(batchColumn->data()));
1579+
convertedBatch[inputFields[i].StructIndex] = ColumnConverters_[i]->Convert(batchColumn->data());
15771580
}
15781581

15791582
// index of the first row in the block

yt/yql/providers/yt/provider/yql_yt_helpers.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2260,6 +2260,12 @@ bool IsYtTableSuitableForArrowInput(NNodes::TExprBase tableNode, std::function<v
22602260
return false;
22612261
}
22622262

2263+
auto rowSpec = TYtTableBaseInfo::GetRowSpec(tableNode);
2264+
if (rowSpec && !rowSpec->StrictSchema) {
2265+
unsupportedHandler("can't use arrow input on tables with non-strict schema");
2266+
return false;
2267+
}
2268+
22632269
return true;
22642270
}
22652271

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
in Input input_strings_sorted_desc.txt
2+
providers yt
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{"_yql_column_0"="\xE0\xC6\xCE\xCE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="911";"subkey"="2";"value"="kkk";};
2+
{"_yql_column_0"="\xE0\xC8\xC9\xCE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="761";"subkey"="6";"value"="ccc";};
3+
{"_yql_column_0"="\xE0\xCA\xCD\xC8\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="527";"subkey"="4";"value"="bbb";};
4+
{"_yql_column_0"="\xE0\xCD\xCF\xCF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="200";"subkey"="7";"value"="qqq";};
5+
{"_yql_column_0"="\xE0\xCE\xCA\xCF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="150";"subkey"="1";"value"="aaa";};
6+
{"_yql_column_0"="\xE0\xCE\xCA\xCF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="150";"subkey"="3";"value"="iii";};
7+
{"_yql_column_0"="\xE0\xCE\xCA\xCF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="150";"subkey"="8";"value"="zzz";};
8+
{"_yql_column_0"="\xE0\xCF\xC8\xCA\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="075";"subkey"="1";"value"="abc";};
9+
{"_yql_column_0"="\xE0\xCF\xCC\xC8\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="037";"subkey"="5";"value"="ddd";};
10+
{"_yql_column_0"="\xE0\xCF\xCD\xCC\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFC";"key"="023";"subkey"="3";"value"="aaa";};
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
{
2+
"_yql_row_spec" = {
3+
"SortDirections" = [
4+
0;
5+
];
6+
"SortMembers" = [
7+
"key";
8+
];
9+
"SortedBy" = [
10+
"_yql_column_0";
11+
];
12+
"SortedByTypes" = [
13+
[
14+
"DataType";
15+
"String";
16+
];
17+
];
18+
"Type" = [
19+
"StructType";
20+
[
21+
[
22+
"key";
23+
[
24+
"DataType";
25+
"String";
26+
];
27+
];
28+
[
29+
"subkey";
30+
[
31+
"DataType";
32+
"String";
33+
];
34+
];
35+
[
36+
"value";
37+
[
38+
"DataType";
39+
"String";
40+
];
41+
];
42+
];
43+
];
44+
}
45+
}

0 commit comments

Comments
 (0)