Skip to content

Commit 60bad7a

Browse files
authored
Sys columns support for YT map block input mode (#10844)
1 parent 2878186 commit 60bad7a

File tree

10 files changed

+138
-11
lines changed

10 files changed

+138
-11
lines changed

ydb/library/yql/providers/yt/codec/yt_codec_io.cpp

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,7 @@ struct TMkqlReaderImpl::TDecoder {
670670
NKikimr::NUdf::TUnboxedValue Row_;
671671
bool IgnoreStreamTableIndex = false;
672672
bool KeySwitch_ = true;
673+
bool HandlesSysColumns_ = false;
673674

674675
protected:
675676
TInputBuf& Buf_;
@@ -1456,9 +1457,12 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
14561457
{
14571458
InputStream_ = std::make_unique<TInputBufArrowInputStream>(buf, pool);
14581459
ResetColumnConverters();
1460+
1461+
HandlesSysColumns_ = true;
14591462
}
14601463

14611464
bool DecodeNext(NKikimr::NUdf::TUnboxedValue*& items, TMaybe<NKikimr::NMiniKQL::TValuesDictHashMap>&) override {
1465+
YQL_ENSURE(!RangeIndex_);
14621466
AtStart_ = false;
14631467

14641468
if (Chunks_.empty()) {
@@ -1474,11 +1478,12 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
14741478
auto& inputFields = SpecsCache_.GetSpecs().Inputs[TableIndex_]->FieldsVec;
14751479
Row_ = SpecsCache_.NewRow(TableIndex_, items, true);
14761480

1477-
auto& [chunkLen, chunk] = Chunks_.front();
1481+
auto& [chunkRowIndex, chunkLen, chunk] = Chunks_.front();
14781482
for (size_t i = 0; i < inputFields.size(); i++) {
14791483
items[inputFields[i].StructIndex] = SpecsCache_.GetHolderFactory().CreateArrowBlock(std::move(chunk[i]));
14801484
}
14811485
items[inputFields.size()] = SpecsCache_.GetHolderFactory().CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(chunkLen)));
1486+
RowIndex_ = chunkRowIndex;
14821487

14831488
Chunks_.pop_front();
14841489
return true;
@@ -1514,22 +1519,56 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
15141519
return ReadNext();
15151520
}
15161521

1517-
auto& inputFields = Specs_.Inputs[TableIndex_]->FieldsVec;
1522+
auto rowIndices = batch->GetColumnByName("$row_index");
1523+
YQL_ENSURE(rowIndices);
1524+
1525+
auto& decoder = *Specs_.Inputs[TableIndex_];
1526+
auto& inputFields = decoder.FieldsVec;
15181527
YQL_ENSURE(inputFields.size() == ColumnConverters_.size());
15191528

1529+
arrow::compute::ExecContext execContext(Pool_);
15201530
std::vector<arrow::Datum> convertedBatch;
15211531
for (size_t i = 0; i < inputFields.size(); i++) {
15221532
auto batchColumn = batch->GetColumnByName(inputFields[i].Name);
1523-
YQL_ENSURE(batchColumn);
1524-
1533+
if (!batchColumn) {
1534+
arrow::Datum convertedColumn;
1535+
1536+
if (decoder.FillSysColumnPath == inputFields[i].StructIndex) {
1537+
auto tableName = Specs_.TableNames.at(TableIndex_).AsStringRef();
1538+
auto tableNameScalar = arrow::BinaryScalar(std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(tableName.Data()), tableName.Size()));
1539+
convertedColumn = ARROW_RESULT(arrow::MakeArrayFromScalar(tableNameScalar, batch->num_rows(), Pool_));
1540+
1541+
} else if (decoder.FillSysColumnRecord == inputFields[i].StructIndex || decoder.FillSysColumnNum == inputFields[i].StructIndex) {
1542+
auto addFirst = ARROW_RESULT(arrow::compute::Cast(rowIndices, arrow::uint64(), arrow::compute::CastOptions::Safe(), &execContext));
1543+
auto addSecond = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(1));
1544+
convertedColumn = ARROW_RESULT(arrow::compute::Add(addFirst, addSecond, arrow::compute::ArithmeticOptions(), &execContext));
1545+
1546+
if (decoder.FillSysColumnNum == inputFields[i].StructIndex) {
1547+
auto addThird = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(Specs_.TableOffsets.at(TableIndex_)));
1548+
convertedColumn = ARROW_RESULT(arrow::compute::Add(convertedColumn, addThird, arrow::compute::ArithmeticOptions(), &execContext));
1549+
}
1550+
} else if (decoder.FillSysColumnIndex == inputFields[i].StructIndex) {
1551+
convertedColumn = ARROW_RESULT(arrow::MakeArrayFromScalar(arrow::UInt32Scalar(TableIndex_), batch->num_rows()));
1552+
} else {
1553+
YQL_ENSURE(false, "unexpected column: " << inputFields[i].Name);
1554+
}
1555+
1556+
convertedBatch.emplace_back(convertedColumn);
1557+
continue;
1558+
}
1559+
15251560
convertedBatch.emplace_back(ColumnConverters_[i]->Convert(batchColumn->data()));
15261561
}
15271562

1563+
// index of the first row in the block
1564+
ui64 blockRowIndex = std::dynamic_pointer_cast<arrow::Int64Scalar>(ARROW_RESULT(rowIndices->GetScalar(0)))->value;
1565+
15281566
NUdf::TArgsDechunker dechunker(std::move(convertedBatch));
15291567
std::vector<arrow::Datum> chunk;
15301568
ui64 chunkLen = 0;
15311569
while (dechunker.Next(chunk, chunkLen)) {
1532-
Chunks_.emplace_back(chunkLen, std::move(chunk));
1570+
Chunks_.emplace_back(blockRowIndex, chunkLen, std::move(chunk));
1571+
blockRowIndex += chunkLen;
15331572
}
15341573

15351574
return true;
@@ -1551,7 +1590,7 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
15511590
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> StreamReader_;
15521591
std::vector<std::unique_ptr<IYtColumnConverter>> ColumnConverters_;
15531592

1554-
TDeque<std::pair<ui64, std::vector<arrow::Datum>>> Chunks_;
1593+
TDeque<std::tuple<ui64, ui64, std::vector<arrow::Datum>>> Chunks_;
15551594

15561595
const TMkqlIOSpecs& Specs_;
15571596
arrow::MemoryPool* Pool_;
@@ -1679,7 +1718,7 @@ void TMkqlReaderImpl::Next() {
16791718
return;
16801719
}
16811720

1682-
if (Decoder_->RowIndex_) {
1721+
if (Decoder_->RowIndex_ && !Decoder_->HandlesSysColumns_) {
16831722
++*Decoder_->RowIndex_;
16841723
}
16851724

@@ -1746,6 +1785,11 @@ void TMkqlReaderImpl::Next() {
17461785
items[index] = defVal;
17471786
}
17481787
}
1788+
1789+
if (Decoder_->HandlesSysColumns_) {
1790+
return;
1791+
}
1792+
17491793
if (decoder.FillSysColumnPath) {
17501794
items[*decoder.FillSysColumnPath] = Specs_->TableNames.at(Decoder_->TableIndex_);
17511795
}

ydb/library/yql/providers/yt/provider/yql_yt_block_input_filter.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,6 @@ class YtBlockInputFilterTransformer : public TOptimizeTransformerBase {
7272
return false;
7373
}
7474

75-
if (!NYql::GetSettingAsColumnList(map.Input().Item(0).Settings().Ref(), EYtSettingType::SysColumns).empty()) {
76-
return false;
77-
}
78-
7975
for (auto path : map.Input().Item(0).Paths()) {
8076
if (!IsYtTableSuitableForArrowInput(path.Table(), [](const TString&) {})) {
8177
return false;

ydb/library/yql/tests/sql/sql2yql/canondata/result.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3380,6 +3380,13 @@
33803380
"uri": "https://{canondata_backend}/1817427/4978ac42ab916ab33f9caeb75154df4b4d7e3dee/resource.tar.gz#test_sql2yql.test_blocks-block_input_/sql.yql"
33813381
}
33823382
],
3383+
"test_sql2yql.test[blocks-block_input_sys_columns]": [
3384+
{
3385+
"checksum": "2f2abdffad20313a9c0dcd5eb1124e94",
3386+
"size": 5695,
3387+
"uri": "https://{canondata_backend}/1599023/85abe49b40cf536aae38f22fd9096a110453232d/resource.tar.gz#test_sql2yql.test_blocks-block_input_sys_columns_/sql.yql"
3388+
}
3389+
],
33833390
"test_sql2yql.test[blocks-block_input_various_types]": [
33843391
{
33853392
"checksum": "000d1e06a52c9e63fae366c4d00b485d",
@@ -23162,6 +23169,13 @@
2316223169
"uri": "https://{canondata_backend}/1916746/5f2f0160368a9941e5c2a4f26544ec2ad584dc00/resource.tar.gz#test_sql_format.test_blocks-block_input_/formatted.sql"
2316323170
}
2316423171
],
23172+
"test_sql_format.test[blocks-block_input_sys_columns]": [
23173+
{
23174+
"checksum": "0ad7ef4ec663eb32830c69d3b85e0c13",
23175+
"size": 597,
23176+
"uri": "https://{canondata_backend}/1599023/85abe49b40cf536aae38f22fd9096a110453232d/resource.tar.gz#test_sql_format.test_blocks-block_input_sys_columns_/formatted.sql"
23177+
}
23178+
],
2316523179
"test_sql_format.test[blocks-block_input_various_types]": [
2316623180
{
2316723181
"checksum": "d63e026fe9510553a3a84e7b6a1c3215",
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
in Input1 input1.txt
2+
in Input2 input2.txt
3+
providers yt
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
USE plato;
2+
3+
PRAGMA yt.JobBlockInput;
4+
5+
$a = (
6+
SELECT TableName() AS table_name, TableRecordIndex() AS record_index, Input1.* FROM Input1
7+
UNION ALL
8+
SELECT TableName() AS table_name, TableRecordIndex() AS record_index, Input2.* FROM Input2
9+
);
10+
11+
$b = (
12+
SELECT * FROM Input1 AS users
13+
UNION ALL
14+
SELECT * FROM Input2 AS users
15+
);
16+
17+
SELECT * FROM $a ORDER BY table_name, record_index;
18+
SELECT ROW_NUMBER() OVER () AS row_num, b.* FROM $b AS b ORDER BY row_num;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{"key"="023";"subkey"="3";"value"="aaa"};
2+
{"key"="037";"subkey"="5";"value"="ddd"};
3+
{"key"="075";"subkey"="1";"value"="abc"};
4+
{"key"="150";"subkey"="1";"value"="aaa"};
5+
{"key"="150";"subkey"="3";"value"="iii"};
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{"_yql_row_spec"={
2+
"Type"=["StructType";[
3+
["key";["DataType";"String"]];
4+
["subkey";["DataType";"String"]];
5+
["value";["DataType";"String"]]
6+
]];
7+
}}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{"key"="150";"subkey"="8";"value"="zzz"};
2+
{"key"="200";"subkey"="7";"value"="qqq"};
3+
{"key"="527";"subkey"="4";"value"="bbb"};
4+
{"key"="761";"subkey"="6";"value"="ccc"};
5+
{"key"="911";"subkey"="2";"value"="kkk"};
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{"_yql_row_spec"={
2+
"Type"=["StructType";[
3+
["key";["DataType";"String"]];
4+
["subkey";["DataType";"String"]];
5+
["value";["DataType";"String"]]
6+
]];
7+
}}

ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,34 @@
463463
"uri": "https://{canondata_backend}/1881367/f7c9625b9d66b51a2a5927015bd3050c9cb7f8d2/resource.tar.gz#test.test_blocks-add_int16--Results_/results.txt"
464464
}
465465
],
466+
"test.test[blocks-block_input_sys_columns--Debug]": [
467+
{
468+
"checksum": "ded9177114542323594c9c39050f7516",
469+
"size": 3940,
470+
"uri": "https://{canondata_backend}/1889210/6489dce426ce128a05df69a7c3a02645b3a03677/resource.tar.gz#test.test_blocks-block_input_sys_columns--Debug_/opt.yql"
471+
}
472+
],
473+
"test.test[blocks-block_input_sys_columns--Peephole]": [
474+
{
475+
"checksum": "05872c78cf9c65e97d11b2083b6b3268",
476+
"size": 3271,
477+
"uri": "https://{canondata_backend}/1889210/6489dce426ce128a05df69a7c3a02645b3a03677/resource.tar.gz#test.test_blocks-block_input_sys_columns--Peephole_/opt.yql"
478+
}
479+
],
480+
"test.test[blocks-block_input_sys_columns--Plan]": [
481+
{
482+
"checksum": "078e55b2dbbb708555799b184ca34c2b",
483+
"size": 11748,
484+
"uri": "https://{canondata_backend}/1923547/473d194f9b0bb6096cc19f91fc9b3aa1a80f2c40/resource.tar.gz#test.test_blocks-block_input_sys_columns--Plan_/plan.txt"
485+
}
486+
],
487+
"test.test[blocks-block_input_sys_columns--Results]": [
488+
{
489+
"checksum": "b8ffe977850a9ff20268f86e8b090a52",
490+
"size": 6654,
491+
"uri": "https://{canondata_backend}/1923547/473d194f9b0bb6096cc19f91fc9b3aa1a80f2c40/resource.tar.gz#test.test_blocks-block_input_sys_columns--Results_/results.txt"
492+
}
493+
],
466494
"test.test[blocks-combine_all_max--Debug]": [
467495
{
468496
"checksum": "98526dc361ef8097fd184daa459dabf1",

0 commit comments

Comments
 (0)