Skip to content

Commit ea0c2bd

Browse files
committed
Tune YDB <-> YQL deps
init commit_hash:16572ab4e94aea4f7455c2ccb90b70ea99a412db
1 parent 2d0e749 commit ea0c2bd

40 files changed

+509
-2617
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
yql_dq_task_transform.cpp
5+
)
6+
7+
PEERDIR(
8+
yql/essentials/minikql
9+
)
10+
11+
YQL_LAST_ABI_VERSION()
12+
13+
END()
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#include <yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h>
2+
3+
namespace NYql {
4+
5+
TTaskTransformFactory CreateCompositeTaskTransformFactory(TVector<TTaskTransformFactory> factories) {
6+
return [factories = std::move(factories)] (const THashMap<TString, TString>& taskParams, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry) -> NKikimr::NMiniKQL::TCallableVisitFuncProvider {
7+
TVector<NKikimr::NMiniKQL::TCallableVisitFuncProvider> funcProviders;
8+
for (auto& factory: factories) {
9+
funcProviders.push_back(factory(taskParams, funcRegistry));
10+
}
11+
return [funcProviders = std::move(funcProviders)] (const NKikimr::NMiniKQL::TInternName& name) -> NKikimr::NMiniKQL::TCallableVisitFunc {
12+
for (auto& provider: funcProviders) {
13+
if (auto res = provider(name)) {
14+
return res;
15+
}
16+
}
17+
return {};
18+
};
19+
};
20+
}
21+
22+
} // NYql
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <yql/essentials/minikql/mkql_node_visitor.h>
4+
#include <yql/essentials/minikql/mkql_function_registry.h>
5+
6+
#include <util/generic/hash.h>
7+
#include <util/generic/string.h>
8+
9+
#include <functional>
10+
11+
namespace NYql {
12+
13+
using TTaskTransformFactory = std::function<NKikimr::NMiniKQL::TCallableVisitFuncProvider(const THashMap<TString, TString>&, const NKikimr::NMiniKQL::IFunctionRegistry*)>;
14+
15+
TTaskTransformFactory CreateCompositeTaskTransformFactory(TVector<TTaskTransformFactory> factories);
16+
17+
} // namespace NYql
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
yql_dq_integration.cpp
5+
yql_dq_optimization.cpp
6+
)
7+
8+
PEERDIR(
9+
contrib/libs/protobuf
10+
library/cpp/yson
11+
yql/essentials/ast
12+
yql/essentials/core
13+
yql/essentials/core/expr_nodes
14+
)
15+
16+
YQL_LAST_ABI_VERSION()
17+
18+
END()
19+
20+
RECURSE(
21+
transform
22+
)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#include "yql_dq_integration.h"
2+
3+
#include <yql/essentials/core/yql_type_annotation.h>
4+
5+
namespace NYql {
6+
7+
std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx) {
8+
std::unordered_set<IDqIntegration*> uniqueIntegrations(typesCtx.DataSources.size() + typesCtx.DataSinks.size());
9+
for (const auto& provider : typesCtx.DataSources) {
10+
if (auto* dqIntegration = provider->GetDqIntegration()) {
11+
uniqueIntegrations.emplace(dqIntegration);
12+
}
13+
}
14+
15+
for (const auto& provider : typesCtx.DataSinks) {
16+
if (auto* dqIntegration = provider->GetDqIntegration()) {
17+
uniqueIntegrations.emplace(dqIntegration);
18+
}
19+
}
20+
21+
return uniqueIntegrations;
22+
}
23+
24+
} // namespace NYql
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#pragma once
2+
3+
#include <yql/essentials/ast/yql_expr.h>
4+
#include <yql/essentials/core/yql_data_provider.h>
5+
#include <yql/essentials/core/yql_statistics.h>
6+
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
7+
#include <yql/essentials/public/issue/yql_issue.h>
8+
9+
#include <library/cpp/yson/writer.h>
10+
11+
#include <util/generic/string.h>
12+
#include <util/generic/vector.h>
13+
#include <util/generic/map.h>
14+
#include <util/generic/maybe.h>
15+
16+
#include <google/protobuf/any.pb.h>
17+
18+
namespace NJson {
19+
class TJsonValue;
20+
} // namespace NJson
21+
22+
namespace NYql {
23+
24+
struct TDqSettings;
25+
class TTransformationPipeline;
26+
27+
namespace NCommon {
28+
class TMkqlCallableCompilerBase;
29+
}
30+
31+
class TFallbackError: public yexception {
32+
public:
33+
TFallbackError(TIssuePtr issue = {})
34+
: Issue_(std::move(issue))
35+
{}
36+
37+
TIssuePtr GetIssue() const {
38+
return Issue_;
39+
}
40+
private:
41+
TIssuePtr Issue_;
42+
};
43+
44+
class IDqIntegration {
45+
public:
46+
virtual ~IDqIntegration() {}
47+
48+
virtual ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
49+
TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0;
50+
virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) = 0;
51+
virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0;
52+
virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) = 0;
53+
virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0;
54+
virtual TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) = 0;
55+
virtual TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0;
56+
57+
// Nothing if callable is not for writing,
58+
// false if callable is for writing and there are some errors (they are added to ctx),
59+
// true if callable is for writing and no issues occured.
60+
virtual TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) = 0;
61+
62+
virtual TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0;
63+
virtual bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) = 0;
64+
virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0;
65+
virtual bool CanFallback() = 0;
66+
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t maxPartitions, TExprContext& ctx) = 0;
67+
virtual void FillLookupSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0;
68+
virtual void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) = 0;
69+
virtual void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) = 0;
70+
virtual void Annotate(const TExprNode& node, THashMap<TString, TString>& params) = 0;
71+
virtual bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) = 0;
72+
virtual void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) = 0;
73+
74+
// Fill plan operator properties for sources/sinks
75+
// Return true if node was handled
76+
virtual bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
77+
virtual bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
78+
// Called to configure DQ peephole
79+
virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0;
80+
};
81+
82+
std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx);
83+
84+
} // namespace NYql
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
#include "yql_dq_optimization.h"
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#pragma once
2+
3+
#include <yql/essentials/ast/yql_expr.h>
4+
5+
namespace NYql {
6+
7+
class IDqOptimization {
8+
public:
9+
virtual ~IDqOptimization() {}
10+
11+
/**
12+
Rewrite DqReadWrap's underlying provider specific read callable
13+
Args:
14+
* read - provider specific read callable
15+
* ctx - expr context
16+
Returns one of:
17+
* empty TPtr on error
18+
* original `read`, if no changes
19+
* new read, if any optimizations
20+
*/
21+
virtual TExprNode::TPtr RewriteRead(const TExprNode::TPtr& read, TExprContext& ctx) = 0;
22+
23+
/**
24+
Rewrite DqReadWrap's underlying provider specific read callable for lookup
25+
Args:
26+
* read - provider specific read callable
27+
* ctx - expr context
28+
Returns of of:
29+
* empty TPtr, if lookup read is not supported
30+
* DqLookupSourceWrap callable
31+
*/
32+
virtual TExprNode::TPtr RewriteLookupRead(const TExprNode::TPtr& read, TExprContext& ctx) = 0;
33+
34+
/**
35+
Apply new members subset for DqReadWrap's underlying provider specific read callable
36+
Args:
37+
* read - provider specific read callable
38+
* members - expr list of atoms with new members
39+
* ctx - expr context
40+
Returns one of:
41+
* empty TPtr on error
42+
* original `read`, if no changes
43+
* new read with applyed new members
44+
*/
45+
virtual TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& read, const TExprNode::TPtr& members, TExprContext& ctx) = 0;
46+
47+
/**
48+
Apply `take` or `skip` setting for DqReadWrap's underlying provider specific read callable
49+
Args:
50+
* read - provider specific read callable
51+
* countBase - `Take`, `Skip` or `Limit` callable
52+
* ctx - expr context
53+
Returns one of:
54+
* empty TPtr on error
55+
* original `read`, if no changes
56+
* new read with applyed setting
57+
*/
58+
virtual TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& read, const TExprNode::TPtr& countBase, TExprContext& ctx) = 0;
59+
60+
/**
61+
Apply `unordered` setting for DqReadWrap's underlying provider specific read callable
62+
Args:
63+
* read - provider specific read callable
64+
* ctx - expr context
65+
Returns one of:
66+
* empty TPtr on error
67+
* original `read`, if no changes
68+
* new read with applyed setting
69+
*/
70+
virtual TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& read, TExprContext& ctx) = 0;
71+
72+
/**
73+
Optimize list/stream extend for set of DqReadWrap's underlying provider specific read callable
74+
Args:
75+
* listOfRead - expr list of provider specific read callables
76+
* ordered - `true` for ordered extend (must keep original order of reads); `false`, if reads may be reordred due the optimization
77+
* ctx - expr context
78+
Returns one of:
79+
* empty list on error
80+
* original `listOfRead`, if no changes
81+
* new optimized list of reads. Returned list length must not be greater than original `listOfRead` length
82+
*/
83+
virtual TExprNode::TListType ApplyExtend(const TExprNode::TListType& listOfRead, bool ordered, TExprContext& ctx) = 0;
84+
};
85+
86+
} // namespace NYql

yql/essentials/core/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ END()
9898
RECURSE(
9999
cbo
100100
credentials
101+
dq_integration
101102
file_storage
102103
issue
103104
minsketch

yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h

Lines changed: 0 additions & 65 deletions
This file was deleted.

0 commit comments

Comments
 (0)