Skip to content

Commit 62f2066

Browse files
committed
YT block input for table content
commit_hash:6ece06798fc8cef2e4d8e62cf5b9634b3162aa45
1 parent abe9abd commit 62f2066

25 files changed

+569
-29
lines changed

yt/yql/providers/yt/codec/yt_codec.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ class TMkqlIOSpecs {
133133
UseBlockOutput_ = true;
134134
}
135135

136+
void SetIsTableContent() {
137+
IsTableContent_ = true;
138+
}
139+
136140
void SetTableOffsets(const TVector<ui64>& offsets);
137141

138142
void Clear();
@@ -148,6 +152,7 @@ class TMkqlIOSpecs {
148152
bool UseSkiff_ = false;
149153
bool UseBlockInput_ = false;
150154
bool UseBlockOutput_ = false;
155+
bool IsTableContent_ = false;
151156
TString OptLLVM_;
152157
TSystemFields SystemFields_;
153158

yt/yql/providers/yt/codec/yt_codec_io.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1536,7 +1536,7 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
15361536
YQL_ENSURE(inputFields.size() == ColumnConverters_.size());
15371537

15381538
auto rowIndices = batch->GetColumnByName("$row_index");
1539-
YQL_ENSURE(rowIndices || decoder.Dynamic);
1539+
YQL_ENSURE(rowIndices || decoder.Dynamic || Specs_.IsTableContent_);
15401540

15411541
arrow::compute::ExecContext execContext(Pool_);
15421542
std::vector<arrow::Datum> convertedBatch;

yt/yql/providers/yt/common/yql_yt_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
524524
REGISTER_SETTING(*this, MaxColumnGroups);
525525
REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount);
526526
REGISTER_SETTING(*this, JobBlockInput);
527+
REGISTER_SETTING(*this, JobBlockTableContent);
527528
REGISTER_SETTING(*this, JobBlockOutput).Parser([](const TString& v) { return FromString<EBlockOutputMode>(v); });
528529
REGISTER_SETTING(*this, _EnableYtDqProcessWriteConstraints);
529530
REGISTER_SETTING(*this, CompactForDistinct);

yt/yql/providers/yt/common/yql_yt_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ struct TYtSettings {
294294
NCommon::TConfSetting<ui16, false> MaxColumnGroups;
295295
NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount;
296296
NCommon::TConfSetting<bool, false> JobBlockInput;
297+
NCommon::TConfSetting<bool, false> JobBlockTableContent;
297298
NCommon::TConfSetting<TSet<TString>, false> JobBlockInputSupportedTypes;
298299
NCommon::TConfSetting<TSet<NUdf::EDataSlot>, false> JobBlockInputSupportedDataTypes;
299300
NCommon::TConfSetting<EBlockOutputMode, false> JobBlockOutput;

yt/yql/providers/yt/comp_nodes/ya.make.inc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ INCLUDE(${ARCADIA_ROOT}/yql/essentials/minikql/invoke_builtins/header.ya.make.in
44
SET(ORIG_SRC_DIR ${ARCADIA_ROOT}/yt/yql/providers/yt/comp_nodes)
55

66
SET(ORIG_SOURCES
7+
yql_mkql_file_block_stream.cpp
78
yql_mkql_file_input_state.cpp
89
yql_mkql_file_list.cpp
910
yql_mkql_input_stream.cpp
1011
yql_mkql_input.cpp
1112
yql_mkql_output.cpp
13+
yql_mkql_block_table_content.cpp
1214
yql_mkql_table_content.cpp
1315
yql_mkql_table.cpp
1416
yql_mkql_ungrouping_list.cpp
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#include "yql_mkql_block_table_content.h"
2+
#include "yql_mkql_file_block_stream.h"
3+
4+
#include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
5+
#include <yql/essentials/minikql/mkql_node_cast.h>
6+
#include <yql/essentials/minikql/defs.h>
7+
8+
#include <yql/essentials/public/udf/udf_value.h>
9+
10+
#include <util/generic/vector.h>
11+
#include <util/generic/string.h>
12+
#include <util/generic/size_literals.h>
13+
14+
namespace NYql {
15+
16+
using namespace NKikimr;
17+
using namespace NKikimr::NMiniKQL;
18+
19+
class TYtBlockTableContentWrapper : public TMutableComputationNode<TYtBlockTableContentWrapper> {
20+
typedef TMutableComputationNode<TYtBlockTableContentWrapper> TBaseComputation;
21+
public:
22+
TYtBlockTableContentWrapper(TComputationMutables& mutables, NCommon::TCodecContext& codecCtx,
23+
TVector<TString>&& files, const TString& inputSpec, TStructType* origStructType, bool decompress, std::optional<ui64> expectedRowCount)
24+
: TBaseComputation(mutables)
25+
, Files_(std::move(files))
26+
, Decompress_(decompress)
27+
, ExpectedRowCount_(std::move(expectedRowCount))
28+
{
29+
Spec_.SetUseBlockInput();
30+
Spec_.SetIsTableContent();
31+
Spec_.Init(codecCtx, inputSpec, {}, {}, origStructType, {}, TString());
32+
}
33+
34+
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
35+
return ctx.HolderFactory.Create<TFileWideBlockStreamValue>(Spec_, ctx.HolderFactory, Files_, Decompress_, 4, 1_MB, ExpectedRowCount_);
36+
}
37+
38+
private:
39+
void RegisterDependencies() const final {}
40+
41+
TMkqlIOSpecs Spec_;
42+
TVector<TString> Files_;
43+
const bool Decompress_;
44+
const std::optional<ui64> ExpectedRowCount_;
45+
};
46+
47+
IComputationNode* WrapYtBlockTableContent(NCommon::TCodecContext& codecCtx,
48+
TComputationMutables& mutables, TCallable& callable, TStringBuf pathPrefix)
49+
{
50+
MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 arguments");
51+
TString uniqueId(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
52+
auto origStructType = AS_TYPE(TStructType, AS_VALUE(TTypeType, callable.GetInput(1)));
53+
const ui32 tablesCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui32>();
54+
TString inputSpec(AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().AsStringRef());
55+
const bool decompress = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<bool>();
56+
57+
std::optional<ui64> length;
58+
TTupleLiteral* lengthTuple = AS_VALUE(TTupleLiteral, callable.GetInput(5));
59+
if (lengthTuple->GetValuesCount() > 0) {
60+
MKQL_ENSURE(lengthTuple->GetValuesCount() == 1, "Expect 1 element in the length tuple");
61+
length = AS_VALUE(TDataLiteral, lengthTuple->GetValue(0))->AsValue().Get<ui64>();
62+
}
63+
64+
TVector<TString> files;
65+
for (ui32 index = 0; index < tablesCount; ++index) {
66+
files.push_back(TStringBuilder() << pathPrefix << uniqueId << '_' << index);
67+
}
68+
69+
return new TYtBlockTableContentWrapper(mutables, codecCtx, std::move(files), inputSpec,
70+
origStructType, decompress, length);
71+
}
72+
73+
} // NYql
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
3+
#include <yql/essentials/providers/common/codec/yql_codec.h>
4+
5+
#include <yql/essentials/minikql/mkql_node.h>
6+
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
7+
8+
#include <util/generic/string.h>
9+
#include <util/generic/strbuf.h>
10+
11+
namespace NYql {
12+
13+
NKikimr::NMiniKQL::IComputationNode* WrapYtBlockTableContent(
14+
NYql::NCommon::TCodecContext& codecCtx,
15+
NKikimr::NMiniKQL::TComputationMutables& mutables,
16+
NKikimr::NMiniKQL::TCallable& callable, TStringBuf pathPrefix);
17+
18+
} // NYql
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#include "yql_mkql_file_block_stream.h"
2+
#include "yql_mkql_file_input_state.h"
3+
4+
namespace NYql {
5+
6+
using namespace NKikimr::NMiniKQL;
7+
8+
TFileWideBlockStreamValue::TFileWideBlockStreamValue(
9+
TMemoryUsageInfo* memInfo,
10+
const TMkqlIOSpecs& spec,
11+
const THolderFactory& holderFactory,
12+
const TVector<TString>& filePaths,
13+
bool decompress,
14+
size_t blockCount,
15+
size_t blockSize,
16+
std::optional<ui64> expectedRowCount
17+
)
18+
: TComputationValue(memInfo)
19+
, Spec_(spec)
20+
, HolderFactory_(holderFactory)
21+
, FilePaths_(filePaths)
22+
, Decompress_(decompress)
23+
, BlockCount_(blockCount)
24+
, BlockSize_(blockSize)
25+
, ExpectedRowCount_(expectedRowCount)
26+
{
27+
State_ = MakeHolder<TFileInputState>(Spec_, HolderFactory_, MakeMkqlFileInputs(FilePaths_, Decompress_), BlockCount_, BlockSize_);
28+
}
29+
30+
NUdf::EFetchStatus TFileWideBlockStreamValue::WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
31+
if (!AtStart_) {
32+
State_->Next();
33+
}
34+
AtStart_ = false;
35+
if (!State_->IsValid()) {
36+
MKQL_ENSURE(!ExpectedRowCount_ || *ExpectedRowCount_ == State_->GetRecordIndex(), "Invalid file row count");
37+
return NUdf::EFetchStatus::Finish;
38+
}
39+
40+
auto elements = State_->GetCurrent().GetElements();
41+
for (ui32 i = 0; i < width; i++) {
42+
if (auto out = output++) {
43+
*out = elements[i];
44+
}
45+
}
46+
47+
return NUdf::EFetchStatus::Ok;
48+
}
49+
50+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#pragma once
2+
3+
#include "yql_mkql_file_input_state.h"
4+
5+
#include <yt/yql/providers/yt/codec/yt_codec.h>
6+
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
7+
8+
#include <util/generic/ptr.h>
9+
#include <util/generic/vector.h>
10+
#include <util/generic/string.h>
11+
12+
namespace NYql {
13+
14+
class TFileWideBlockStreamValue : public NKikimr::NMiniKQL::TComputationValue<TFileWideBlockStreamValue> {
15+
public:
16+
TFileWideBlockStreamValue(
17+
NKikimr::NMiniKQL::TMemoryUsageInfo* memInfo,
18+
const TMkqlIOSpecs& spec,
19+
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
20+
const TVector<TString>& filePaths,
21+
bool decompress,
22+
size_t blockCount,
23+
size_t blockSize,
24+
std::optional<ui64> expectedRowCount
25+
);
26+
27+
private:
28+
NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width);
29+
30+
private:
31+
const TMkqlIOSpecs& Spec_;
32+
const NKikimr::NMiniKQL::THolderFactory& HolderFactory_;
33+
const TVector<TString> FilePaths_;
34+
const bool Decompress_;
35+
const size_t BlockCount_;
36+
const size_t BlockSize_;
37+
const std::optional<ui64> ExpectedRowCount_;
38+
39+
bool AtStart_ = true;
40+
THolder<TFileInputState> State_;
41+
};
42+
43+
}

yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "yql_mkql_file_input_state.h"
22

3+
#include <yql/essentials/minikql/computation/mkql_block_impl.h>
34
#include <yql/essentials/utils/yql_panic.h>
45

56
#include <util/system/fs.h>
@@ -55,7 +56,13 @@ bool TFileInputState::NextValue() {
5556
}
5657

5758
MkqlReader_.Next();
58-
++CurrentRecord_;
59+
if (Spec_->UseBlockInput_) {
60+
auto blockCountValue = CurrentValue_.GetElement(Spec_->Inputs[CurrentInput_]->StructSize);
61+
CurrentRecord_ += GetBlockCount(blockCountValue);
62+
} else {
63+
++CurrentRecord_;
64+
}
65+
5966
return true;
6067
}
6168
}

0 commit comments

Comments
 (0)