Skip to content

Commit 494ab45

Browse files
authored
Support block IO in purecalc (#7016)
1 parent c953dde commit 494ab45

22 files changed

+853
-109
lines changed

ydb/library/yql/public/purecalc/common/compile_mkql.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ NKikimr::NMiniKQL::TRuntimeNode CompileMkql(const TExprNode::TPtr& exprRoot, TEx
9898
NCommon::TMkqlCommonCallableCompiler compiler;
9999

100100
compiler.AddCallable(PurecalcInputCallableName, MakeSelfCallableCompiler());
101+
compiler.AddCallable(PurecalcBlockInputCallableName, MakeSelfCallableCompiler());
101102
compiler.OverrideCallable("FileContent", MakeFileContentCallableCompiler(userData));
102103
compiler.OverrideCallable("FilePath", MakeFilePathCallableCompiler(userData));
103104
compiler.OverrideCallable("FolderPath", MakeFolderPathCallableCompiler(userData));

ydb/library/yql/public/purecalc/common/interface.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,12 +591,28 @@ namespace NYql {
591591
*/
592592
virtual const NKikimr::NMiniKQL::TStructType* GetInputType(bool original = false) const = 0;
593593

594+
/**
595+
* MiniKQL input struct type of the specified input for this program.
596+
* The returned type is the actual type of the specified input node.
597+
*/
598+
virtual const NKikimr::NMiniKQL::TStructType* GetRawInputType(ui32) const = 0;
599+
/**
600+
* Overload for single-input programs.
601+
*/
602+
virtual const NKikimr::NMiniKQL::TStructType* GetRawInputType() const = 0;
603+
594604
/**
595605
* MiniKQL output struct type for this program. The returned type is equivalent to the deduced output
596606
* schema (see IWorker::MakeFullOutputSchema()).
597607
*/
598608
virtual const NKikimr::NMiniKQL::TType* GetOutputType() const = 0;
599609

610+
/**
611+
* MiniKQL output struct type for this program. The returned type is
612+
* the actual type of the root node.
613+
*/
614+
virtual const NKikimr::NMiniKQL::TType* GetRawOutputType() const = 0;
615+
600616
/**
601617
* Make input type schema for specified input as deduced by program optimizer. This schema is equivalent
602618
* to one provided by input spec up to the order of the fields in structures.
@@ -795,7 +811,7 @@ namespace NYql {
795811
return AllVirtualColumns_;
796812
}
797813

798-
static constexpr bool ProvidesBlocks = false;
814+
virtual bool ProvidesBlocks() const { return false; }
799815
};
800816

801817
/**
@@ -840,7 +856,7 @@ namespace NYql {
840856
OutputColumnsFilter_ = outputColumnsFilter;
841857
}
842858

843-
static constexpr bool AcceptsBlocks = false;
859+
virtual bool AcceptsBlocks() const { return false; }
844860
};
845861

846862
////////////////////////////////////////////////////////////////////////////////////////////////////

ydb/library/yql/public/purecalc/common/names.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55
namespace NYql::NPureCalc {
66
const TStringBuf PurecalcSysColumnsPrefix = "_yql_sys_";
77
const TStringBuf PurecalcSysColumnTablePath = "_yql_sys_tablepath";
8+
const TStringBuf PurecalcBlockColumnLength = "_yql_block_length";
89

910
const TStringBuf PurecalcDefaultCluster = "view";
1011
const TStringBuf PurecalcDefaultService = "data";
1112

1213
const TStringBuf PurecalcInputCallableName = "Self";
1314
const TStringBuf PurecalcInputTablePrefix = "Input";
1415

16+
const TStringBuf PurecalcBlockInputCallableName = "BlockSelf";
17+
1518
const TStringBuf PurecalcUdfModulePrefix = "<purecalc>::";
1619
}

ydb/library/yql/public/purecalc/common/names.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55
namespace NYql::NPureCalc {
66
extern const TStringBuf PurecalcSysColumnsPrefix;
77
extern const TStringBuf PurecalcSysColumnTablePath;
8+
extern const TStringBuf PurecalcBlockColumnLength;
89

910
extern const TStringBuf PurecalcDefaultCluster;
1011
extern const TStringBuf PurecalcDefaultService;
1112

1213
extern const TStringBuf PurecalcInputCallableName;
1314
extern const TStringBuf PurecalcInputTablePrefix;
1415

16+
extern const TStringBuf PurecalcBlockInputCallableName;
17+
1518
extern const TStringBuf PurecalcUdfModulePrefix;
1619
}

ydb/library/yql/public/purecalc/common/transformations/align_output_schema.cpp

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "align_output_schema.h"
22

3+
#include <ydb/library/yql/public/purecalc/common/names.h>
34
#include <ydb/library/yql/public/purecalc/common/type_from_schema.h>
5+
#include <ydb/library/yql/public/purecalc/common/transformations/utils.h>
46

57
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
68

@@ -11,11 +13,17 @@ namespace {
1113
class TOutputAligner : public TSyncTransformerBase {
1214
private:
1315
const TTypeAnnotationNode* OutputStruct_;
16+
bool AcceptsBlocks_;
1417
EProcessorMode ProcessorMode_;
1518

1619
public:
17-
explicit TOutputAligner(const TTypeAnnotationNode* outputStruct, EProcessorMode processorMode)
20+
explicit TOutputAligner(
21+
const TTypeAnnotationNode* outputStruct,
22+
bool acceptsBlocks,
23+
EProcessorMode processorMode
24+
)
1825
: OutputStruct_(outputStruct)
26+
, AcceptsBlocks_(acceptsBlocks)
1927
, ProcessorMode_(processorMode)
2028
{
2129
}
@@ -29,6 +37,19 @@ namespace {
2937
const auto* actualType = MakeActualType(input);
3038
const auto* actualItemType = MakeActualItemType(input);
3139

40+
// XXX: Tweak the obtained expression type, is the spec supports blocks:
41+
// 1. Remove "_yql_block_length" attribute, since it's for internal usage.
42+
// 2. Strip block container from the type to store its internal type.
43+
if (AcceptsBlocks_) {
44+
Y_ENSURE(actualItemType->GetKind() == ETypeAnnotationKind::Struct);
45+
actualItemType = UnwrapBlockStruct(actualItemType->Cast<TStructExprType>(), ctx);
46+
if (ProcessorMode_ == EProcessorMode::PullList) {
47+
actualType = ctx.MakeType<TListExprType>(actualItemType);
48+
} else {
49+
actualType = ctx.MakeType<TStreamExprType>(actualItemType);
50+
}
51+
}
52+
3253
if (!ValidateOutputType(actualItemType, expectedItemType, ctx)) {
3354
return TStatus::Error;
3455
}
@@ -78,8 +99,12 @@ namespace {
7899
auto actualType = MakeActualType(input);
79100
switch (actualType->GetKind()) {
80101
case ETypeAnnotationKind::Stream:
102+
Y_ENSURE(ProcessorMode_ != EProcessorMode::PullList,
103+
"processor mode mismatches the actual container type");
81104
return actualType->Cast<TStreamExprType>()->GetItemType();
82105
case ETypeAnnotationKind::List:
106+
Y_ENSURE(ProcessorMode_ == EProcessorMode::PullList,
107+
"processor mode mismatches the actual container type");
83108
return actualType->Cast<TListExprType>()->GetItemType();
84109
default:
85110
Y_ABORT("unexpected return type");
@@ -88,6 +113,10 @@ namespace {
88113
};
89114
}
90115

91-
TAutoPtr<IGraphTransformer> NYql::NPureCalc::MakeOutputAligner(const TTypeAnnotationNode* outputStruct, EProcessorMode processorMode) {
92-
return new TOutputAligner(outputStruct, processorMode);
116+
TAutoPtr<IGraphTransformer> NYql::NPureCalc::MakeOutputAligner(
117+
const TTypeAnnotationNode* outputStruct,
118+
bool acceptsBlocks,
119+
EProcessorMode processorMode
120+
) {
121+
return new TOutputAligner(outputStruct, acceptsBlocks, processorMode);
93122
}

ydb/library/yql/public/purecalc/common/transformations/align_output_schema.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@ namespace NYql {
1111
* A transformer which converts an output type of the expression to the given type or reports an error.
1212
*
1313
* @param outputStruct destination output struct type.
14+
* @param acceptsBlocks indicates, whether the output type need to be
15+
* preprocessed.
16+
* @param processorMode specifies the top-most container of the result.
1417
* @return a graph transformer for type alignment.
1518
*/
1619
TAutoPtr<IGraphTransformer> MakeOutputAligner(
1720
const TTypeAnnotationNode* outputStruct,
21+
bool acceptsBlocks,
1822
EProcessorMode processorMode
1923
);
2024
}

ydb/library/yql/public/purecalc/common/transformations/replace_table_reads.cpp

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "replace_table_reads.h"
22

33
#include <ydb/library/yql/public/purecalc/common/names.h>
4+
#include <ydb/library/yql/public/purecalc/common/transformations/utils.h>
45

56
#include <ydb/library/yql/core/yql_expr_optimize.h>
67
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
@@ -11,23 +12,26 @@ using namespace NYql::NPureCalc;
1112
namespace {
1213
class TTableReadsReplacer: public TSyncTransformerBase {
1314
private:
14-
ui32 InputsNumber_;
15+
const TVector<const TStructExprType*>& InputStructs_;
1516
bool UseSystemColumns_;
16-
TString TablePrefix_;
17+
EProcessorMode ProcessorMode_;
1718
TString CallableName_;
19+
TString TablePrefix_;
1820
bool Complete_ = false;
1921

2022
public:
2123
explicit TTableReadsReplacer(
22-
ui32 inputsNumber,
24+
const TVector<const TStructExprType*>& inputStructs,
2325
bool useSystemColumns,
24-
TString tablePrefix,
25-
TString inputNodeName
26+
EProcessorMode processorMode,
27+
TString inputNodeName,
28+
TString tablePrefix
2629
)
27-
: InputsNumber_(inputsNumber)
30+
: InputStructs_(inputStructs)
2831
, UseSystemColumns_(useSystemColumns)
29-
, TablePrefix_(std::move(tablePrefix))
32+
, ProcessorMode_(processorMode)
3033
, CallableName_(std::move(inputNodeName))
34+
, TablePrefix_(std::move(tablePrefix))
3135
{
3236
}
3337

@@ -135,6 +139,13 @@ namespace {
135139
.Seal()
136140
.Build();
137141

142+
if (inputNode->IsCallable(PurecalcBlockInputCallableName)) {
143+
const auto inputStruct = InputStructs_[inputIndex]->Cast<TStructExprType>();
144+
const auto blocksLambda = NodeFromBlocks(replacePos, inputStruct, ctx);
145+
bool wrapLMap = ProcessorMode_ == EProcessorMode::PullList;
146+
inputNode = ApplyToIterable(replacePos, inputNode, blocksLambda, wrapLMap, ctx);
147+
}
148+
138149
if (UseSystemColumns_) {
139150
auto mapLambda = ctx.Builder(replacePos)
140151
.Lambda()
@@ -163,7 +174,7 @@ namespace {
163174
return MakeIntrusive<TIssue>(ctx.GetPosition(node->Pos()), TStringBuilder() << "At function: " << node->Content());
164175
});
165176

166-
if (!InputsNumber_) {
177+
if (InputStructs_.empty()) {
167178
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "No inputs provided by input spec"));
168179
return nullptr;
169180
}
@@ -174,10 +185,10 @@ namespace {
174185

175186
auto builder = ctx.Builder(replacePos);
176187

177-
if (InputsNumber_ > 1) {
188+
if (InputStructs_.size() > 1) {
178189
auto listBuilder = builder.List();
179190

180-
for (ui32 i = 0; i < InputsNumber_; ++i) {
191+
for (ui32 i = 0; i < InputStructs_.size(); ++i) {
181192
listBuilder.Callable(i, CallableName_).Atom(0, ToString(i)).Seal();
182193
}
183194

@@ -226,10 +237,11 @@ namespace {
226237
}
227238

228239
TAutoPtr<IGraphTransformer> NYql::NPureCalc::MakeTableReadsReplacer(
229-
ui32 inputsNumber,
240+
const TVector<const TStructExprType*>& inputStructs,
230241
bool useSystemColumns,
231-
TString tablePrefix,
232-
TString callableName
242+
EProcessorMode processorMode,
243+
TString callableName,
244+
TString tablePrefix
233245
) {
234-
return new TTableReadsReplacer(inputsNumber, useSystemColumns, std::move(tablePrefix), std::move(callableName));
246+
return new TTableReadsReplacer(inputStructs, useSystemColumns, processorMode, std::move(callableName), std::move(tablePrefix));
235247
}

ydb/library/yql/public/purecalc/common/transformations/replace_table_reads.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <ydb/library/yql/public/purecalc/common/names.h>
4+
#include <ydb/library/yql/public/purecalc/common/processor_mode.h>
45

56
#include <ydb/library/yql/core/yql_graph_transformer.h>
67

@@ -15,14 +16,15 @@ namespace NYql::NPureCalc {
1516
*
1617
* @param inputStructs types of each input.
1718
* @param useSystemColumns whether to allow special system columns in input structs.
18-
* @param tablePrefix required prefix for all table names (e.g. `Input`).
1919
* @param callableName name of the special callable used to get input data (e.g. `Self`).
20+
* @param tablePrefix required prefix for all table names (e.g. `Input`).
2021
* @param return a graph transformer for replacing table reads.
2122
*/
2223
TAutoPtr<IGraphTransformer> MakeTableReadsReplacer(
23-
ui32 inputsNumber,
24+
const TVector<const TStructExprType*>& inputStructs,
2425
bool useSystemColumns,
25-
TString tablePrefix = TString{PurecalcInputTablePrefix},
26-
TString callableName = TString{PurecalcInputCallableName}
26+
EProcessorMode processorMode,
27+
TString callableName = TString{PurecalcInputCallableName},
28+
TString tablePrefix = TString{PurecalcInputTablePrefix}
2729
);
2830
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#include "root_to_blocks.h"
2+
3+
#include <ydb/library/yql/public/purecalc/common/transformations/utils.h>
4+
5+
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
6+
7+
using namespace NYql;
8+
using namespace NYql::NPureCalc;
9+
10+
namespace {
11+
12+
class TRootToBlocks: public TSyncTransformerBase {
13+
private:
14+
bool AcceptsBlocks_;
15+
EProcessorMode ProcessorMode_;
16+
bool Wrapped_;
17+
18+
public:
19+
explicit TRootToBlocks(bool acceptsBlocks, EProcessorMode processorMode)
20+
: AcceptsBlocks_(acceptsBlocks)
21+
, ProcessorMode_(processorMode)
22+
, Wrapped_(false)
23+
{
24+
}
25+
26+
public:
27+
void Rewind() override {
28+
Wrapped_ = false;
29+
}
30+
31+
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
32+
if (Wrapped_ || !AcceptsBlocks_) {
33+
return IGraphTransformer::TStatus::Ok;
34+
}
35+
36+
const TTypeAnnotationNode* returnItemType;
37+
const TTypeAnnotationNode* returnType = input->GetTypeAnn();
38+
if (ProcessorMode_ == EProcessorMode::PullList) {
39+
Y_ENSURE(returnType->GetKind() == ETypeAnnotationKind::List);
40+
returnItemType = returnType->Cast<TListExprType>()->GetItemType();
41+
} else {
42+
Y_ENSURE(returnType->GetKind() == ETypeAnnotationKind::Stream);
43+
returnItemType = returnType->Cast<TStreamExprType>()->GetItemType();
44+
}
45+
46+
Y_ENSURE(returnItemType->GetKind() == ETypeAnnotationKind::Struct);
47+
const TStructExprType* structType = returnItemType->Cast<TStructExprType>();
48+
const auto blocksLambda = NodeToBlocks(input->Pos(), structType, ctx);
49+
bool wrapLMap = ProcessorMode_ == EProcessorMode::PullList;
50+
output = ApplyToIterable(input->Pos(), input, blocksLambda, wrapLMap, ctx);
51+
52+
Wrapped_ = true;
53+
54+
return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
55+
}
56+
};
57+
58+
} // namespace
59+
60+
TAutoPtr<IGraphTransformer> NYql::NPureCalc::MakeRootToBlocks(
61+
bool acceptsBlocks,
62+
EProcessorMode processorMode
63+
) {
64+
return new TRootToBlocks(acceptsBlocks, processorMode);
65+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#pragma once
2+
3+
#include <ydb/library/yql/public/purecalc/common/processor_mode.h>
4+
5+
#include <ydb/library/yql/core/yql_graph_transformer.h>
6+
7+
namespace NYql {
8+
namespace NPureCalc {
9+
/**
10+
* A transformer which rewrite the root to respect block types.
11+
*
12+
* @param acceptsBlock allows using this transformer in pipeline and
13+
* skip this phase if no block output is required.
14+
* @param processorMode specifies the top-most container of the result.
15+
* @return a graph transformer for rewriting the root node.
16+
*/
17+
TAutoPtr<IGraphTransformer> MakeRootToBlocks(
18+
bool acceptsBlocks,
19+
EProcessorMode processorMode
20+
);
21+
}
22+
}

0 commit comments

Comments
 (0)