Skip to content

Commit 376f6f4

Browse files
committed
Moved yql/public/purecalc YQL-19206
init commit_hash:abf729827c312980464da21824f86ea1defe094c
1 parent d7c900f commit 376f6f4

File tree

107 files changed

+10946
-7
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+10946
-7
lines changed
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#include "compile_mkql.h"
2+
3+
#include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>
4+
#include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
5+
#include <yql/essentials/core/yql_user_data_storage.h>
6+
#include <yql/essentials/public/purecalc/common/names.h>
7+
8+
#include <util/stream/file.h>
9+
10+
namespace NYql::NPureCalc {
11+
12+
namespace {
13+
14+
NCommon::IMkqlCallableCompiler::TCompiler MakeSelfCallableCompiler() {
15+
return [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
16+
MKQL_ENSURE(node.ChildrenSize() == 1, "Self takes exactly 1 argument");
17+
const auto* argument = node.Child(0);
18+
MKQL_ENSURE(argument->IsAtom(), "Self argument must be atom");
19+
ui32 inputIndex = 0;
20+
MKQL_ENSURE(TryFromString(argument->Content(), inputIndex), "Self argument must be UI32");
21+
auto type = NCommon::BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
22+
NKikimr::NMiniKQL::TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), type);
23+
call.Add(ctx.ProgramBuilder.NewDataLiteral<ui32>(inputIndex));
24+
return NKikimr::NMiniKQL::TRuntimeNode(call.Build(), false);
25+
};
26+
}
27+
28+
NCommon::IMkqlCallableCompiler::TCompiler MakeFilePathCallableCompiler(const TUserDataTable& userData) {
29+
return [&](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
30+
const TString name(node.Child(0)->Content());
31+
auto block = TUserDataStorage::FindUserDataBlock(userData, TUserDataKey::File(name));
32+
if (!block) {
33+
auto blockKey = TUserDataKey::File(GetDefaultFilePrefix() + name);
34+
block = TUserDataStorage::FindUserDataBlock(userData, blockKey);
35+
}
36+
MKQL_ENSURE(block, "file not found: " << name);
37+
MKQL_ENSURE(block->Type == EUserDataType::PATH,
38+
"FilePath not supported for non-filesystem user data, name: "
39+
<< name << ", block type: " << block->Type);
40+
return ctx.ProgramBuilder.NewDataLiteral<NKikimr::NUdf::EDataSlot::String>(block->Data);
41+
};
42+
}
43+
44+
NCommon::IMkqlCallableCompiler::TCompiler MakeFileContentCallableCompiler(const TUserDataTable& userData) {
45+
return [&](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
46+
const TString name(node.Child(0)->Content());
47+
auto block = TUserDataStorage::FindUserDataBlock(userData, TUserDataKey::File(name));
48+
if (!block) {
49+
auto blockKey = TUserDataKey::File(GetDefaultFilePrefix() + name);
50+
block = TUserDataStorage::FindUserDataBlock(userData, blockKey);
51+
}
52+
MKQL_ENSURE(block, "file not found: " << name);
53+
if (block->Type == EUserDataType::PATH) {
54+
auto content = TFileInput(block->Data).ReadAll();
55+
return ctx.ProgramBuilder.NewDataLiteral<NKikimr::NUdf::EDataSlot::String>(content);
56+
} else if (block->Type == EUserDataType::RAW_INLINE_DATA) {
57+
return ctx.ProgramBuilder.NewDataLiteral<NKikimr::NUdf::EDataSlot::String>(block->Data);
58+
} else {
59+
// TODO support EUserDataType::URL
60+
MKQL_ENSURE(false, "user data blocks of type URL are not supported by FileContent: " << name);
61+
Y_UNREACHABLE();
62+
}
63+
};
64+
}
65+
66+
NCommon::IMkqlCallableCompiler::TCompiler MakeFolderPathCallableCompiler(const TUserDataTable& userData) {
67+
return [&](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
68+
const TString name(node.Child(0)->Content());
69+
auto folderName = TUserDataStorage::MakeFolderName(name);
70+
TMaybe<TString> folderPath;
71+
for (const auto& x : userData) {
72+
if (!x.first.Alias().StartsWith(folderName)) {
73+
continue;
74+
}
75+
76+
MKQL_ENSURE(x.second.Type == EUserDataType::PATH,
77+
"FilePath not supported for non-file data block, name: "
78+
<< x.first.Alias() << ", block type: " << x.second.Type);
79+
80+
auto pathPrefixLength = x.second.Data.size() - (x.first.Alias().size() - folderName.size());
81+
auto newFolderPath = x.second.Data.substr(0, pathPrefixLength);
82+
if (!folderPath) {
83+
folderPath = newFolderPath;
84+
} else {
85+
MKQL_ENSURE(*folderPath == newFolderPath,
86+
"file " << x.second.Data << " is out of directory " << *folderPath);
87+
}
88+
}
89+
return ctx.ProgramBuilder.NewDataLiteral<NKikimr::NUdf::EDataSlot::String>(*folderPath);
90+
};
91+
}
92+
93+
}
94+
95+
NKikimr::NMiniKQL::TRuntimeNode CompileMkql(const TExprNode::TPtr& exprRoot, TExprContext& exprCtx,
96+
const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const NKikimr::NMiniKQL::TTypeEnvironment& env, const TUserDataTable& userData)
97+
{
98+
NCommon::TMkqlCommonCallableCompiler compiler;
99+
100+
compiler.AddCallable(PurecalcInputCallableName, MakeSelfCallableCompiler());
101+
compiler.AddCallable(PurecalcBlockInputCallableName, MakeSelfCallableCompiler());
102+
compiler.OverrideCallable("FileContent", MakeFileContentCallableCompiler(userData));
103+
compiler.OverrideCallable("FilePath", MakeFilePathCallableCompiler(userData));
104+
compiler.OverrideCallable("FolderPath", MakeFolderPathCallableCompiler(userData));
105+
106+
// Prepare build context
107+
108+
NKikimr::NMiniKQL::TProgramBuilder pgmBuilder(env, funcRegistry);
109+
NCommon::TMkqlBuildContext buildCtx(compiler, pgmBuilder, exprCtx);
110+
111+
// Build the root MKQL node
112+
113+
return NCommon::MkqlBuildExpr(*exprRoot, buildCtx);
114+
}
115+
116+
} // NYql::NPureCalc
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <yql/essentials/public/purecalc/common/interface.h>
4+
#include <yql/essentials/minikql/mkql_node.h>
5+
#include <yql/essentials/ast/yql_expr.h>
6+
#include <yql/essentials/core/yql_user_data.h>
7+
8+
namespace NYql {
9+
namespace NPureCalc {
10+
/**
11+
* Compile expr to mkql byte-code
12+
*/
13+
14+
NKikimr::NMiniKQL::TRuntimeNode CompileMkql(const TExprNode::TPtr& exprRoot, TExprContext& exprCtx,
15+
const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const NKikimr::NMiniKQL::TTypeEnvironment& env, const TUserDataTable& userData);
16+
}
17+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
#include "fwd.h"
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#pragma once
2+
3+
#include <util/generic/fwd.h>
4+
#include <memory>
5+
6+
namespace NYql::NPureCalc {
7+
class TCompileError;
8+
9+
template <typename>
10+
class IConsumer;
11+
12+
template <typename>
13+
class IStream;
14+
15+
class IProgramFactory;
16+
17+
class IWorkerFactory;
18+
19+
class IPullStreamWorkerFactory;
20+
21+
class IPullListWorkerFactory;
22+
23+
class IPushStreamWorkerFactory;
24+
25+
class IWorker;
26+
27+
class IPullStreamWorker;
28+
29+
class IPullListWorker;
30+
31+
class IPushStreamWorker;
32+
33+
class TInputSpecBase;
34+
35+
class TOutputSpecBase;
36+
37+
class IProgram;
38+
39+
template <typename, typename, typename>
40+
class TProgramCommon;
41+
42+
template <typename, typename>
43+
class TPullStreamProgram;
44+
45+
template <typename, typename>
46+
class TPullListProgram;
47+
48+
template <typename, typename>
49+
class TPushStreamProgram;
50+
51+
using IProgramFactoryPtr = TIntrusivePtr<IProgramFactory>;
52+
using IWorkerFactoryPtr = std::shared_ptr<IWorkerFactory>;
53+
using IPullStreamWorkerFactoryPtr = std::shared_ptr<IPullStreamWorkerFactory>;
54+
using IPullListWorkerFactoryPtr = std::shared_ptr<IPullListWorkerFactory>;
55+
using IPushStreamWorkerFactoryPtr = std::shared_ptr<IPushStreamWorkerFactory>;
56+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#include "inspect_input.h"
2+
3+
#include <yql/essentials/core/yql_expr_type_annotation.h>
4+
5+
namespace NYql::NPureCalc {
6+
bool TryFetchInputIndexFromSelf(const TExprNode& node, TExprContext& ctx, ui32 inputsCount, ui32& result) {
7+
TIssueScopeGuard issueSope(ctx.IssueManager, [&]() {
8+
return MakeIntrusive<TIssue>(ctx.GetPosition(node.Pos()), TStringBuilder() << "At function: " << node.Content());
9+
});
10+
11+
if (!EnsureArgsCount(node, 1, ctx)) {
12+
return false;
13+
}
14+
15+
if (!EnsureAtom(*node.Child(0), ctx)) {
16+
return false;
17+
}
18+
19+
if (!TryFromString(node.Child(0)->Content(), result)) {
20+
auto message = TStringBuilder() << "Index " << TString{node.Child(0)->Content()}.Quote() << " isn't UI32";
21+
ctx.AddError(TIssue(ctx.GetPosition(node.Child(0)->Pos()), std::move(message)));
22+
return false;
23+
}
24+
25+
if (result >= inputsCount) {
26+
auto message = TStringBuilder() << "Invalid input index: " << result << " is out of range [0;" << inputsCount << ")";
27+
ctx.AddError(TIssue(ctx.GetPosition(node.Child(0)->Pos()), std::move(message)));
28+
return false;
29+
}
30+
31+
return true;
32+
}
33+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#pragma once
2+
3+
#include <yql/essentials/ast/yql_expr.h>
4+
5+
namespace NYql::NPureCalc {
6+
bool TryFetchInputIndexFromSelf(const TExprNode&, TExprContext&, ui32, ui32&);
7+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
#include "interface.h"
2+
3+
#include <yql/essentials/providers/common/codec/yql_codec_type_flags.h>
4+
#include <yql/essentials/public/purecalc/common/logger_init.h>
5+
#include <yql/essentials/public/purecalc/common/program_factory.h>
6+
7+
using namespace NYql;
8+
using namespace NYql::NPureCalc;
9+
10+
TLoggingOptions::TLoggingOptions()
11+
: LogLevel_(ELogPriority::TLOG_ERR)
12+
, LogDestination(&Clog)
13+
{
14+
}
15+
16+
TLoggingOptions& TLoggingOptions::SetLogLevel(ELogPriority logLevel) {
17+
LogLevel_ = logLevel;
18+
return *this;
19+
}
20+
21+
TLoggingOptions& TLoggingOptions::SetLogDestination(IOutputStream* logDestination) {
22+
LogDestination = logDestination;
23+
return *this;
24+
}
25+
26+
TProgramFactoryOptions::TProgramFactoryOptions()
27+
: UdfsDir_("")
28+
, UserData_()
29+
, LLVMSettings("OFF")
30+
, BlockEngineSettings("disable")
31+
, ExprOutputStream(nullptr)
32+
, CountersProvider(nullptr)
33+
, NativeYtTypeFlags(0)
34+
, UseSystemColumns(false)
35+
, UseWorkerPool(true)
36+
{
37+
}
38+
39+
TProgramFactoryOptions& TProgramFactoryOptions::SetUDFsDir(TStringBuf dir) {
40+
UdfsDir_ = dir;
41+
return *this;
42+
}
43+
44+
TProgramFactoryOptions& TProgramFactoryOptions::AddLibrary(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content) {
45+
auto& ref = UserData_.emplace_back();
46+
47+
ref.Type_ = NUserData::EType::LIBRARY;
48+
ref.Disposition_ = disposition;
49+
ref.Name_ = name;
50+
ref.Content_ = content;
51+
52+
return *this;
53+
}
54+
55+
TProgramFactoryOptions& TProgramFactoryOptions::AddFile(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content) {
56+
auto& ref = UserData_.emplace_back();
57+
58+
ref.Type_ = NUserData::EType::FILE;
59+
ref.Disposition_ = disposition;
60+
ref.Name_ = name;
61+
ref.Content_ = content;
62+
63+
return *this;
64+
}
65+
66+
TProgramFactoryOptions& TProgramFactoryOptions::AddUDF(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content) {
67+
auto& ref = UserData_.emplace_back();
68+
69+
ref.Type_ = NUserData::EType::UDF;
70+
ref.Disposition_ = disposition;
71+
ref.Name_ = name;
72+
ref.Content_ = content;
73+
74+
return *this;
75+
}
76+
77+
TProgramFactoryOptions& TProgramFactoryOptions::SetLLVMSettings(TStringBuf llvm_settings) {
78+
LLVMSettings = llvm_settings;
79+
return *this;
80+
}
81+
82+
TProgramFactoryOptions& TProgramFactoryOptions::SetBlockEngineSettings(TStringBuf blockEngineSettings) {
83+
BlockEngineSettings = blockEngineSettings;
84+
return *this;
85+
}
86+
87+
TProgramFactoryOptions& TProgramFactoryOptions::SetExprOutputStream(IOutputStream* exprOutputStream) {
88+
ExprOutputStream = exprOutputStream;
89+
return *this;
90+
}
91+
92+
TProgramFactoryOptions& TProgramFactoryOptions::SetCountersProvider(NKikimr::NUdf::ICountersProvider* countersProvider) {
93+
CountersProvider = countersProvider;
94+
return *this;
95+
}
96+
97+
TProgramFactoryOptions& TProgramFactoryOptions::SetUseNativeYtTypes(bool useNativeTypes) {
98+
NativeYtTypeFlags = useNativeTypes ? NTCF_PRODUCTION : NTCF_NONE;
99+
return *this;
100+
}
101+
102+
TProgramFactoryOptions& TProgramFactoryOptions::SetNativeYtTypeFlags(ui64 nativeTypeFlags) {
103+
NativeYtTypeFlags = nativeTypeFlags;
104+
return *this;
105+
}
106+
107+
TProgramFactoryOptions& TProgramFactoryOptions::SetDeterministicTimeProviderSeed(TMaybe<ui64> seed) {
108+
DeterministicTimeProviderSeed = seed;
109+
return *this;
110+
}
111+
112+
TProgramFactoryOptions& TProgramFactoryOptions::SetUseSystemColumns(bool useSystemColumns) {
113+
UseSystemColumns = useSystemColumns;
114+
return *this;
115+
}
116+
117+
TProgramFactoryOptions& TProgramFactoryOptions::SetUseWorkerPool(bool useWorkerPool) {
118+
UseWorkerPool = useWorkerPool;
119+
return *this;
120+
}
121+
122+
void NYql::NPureCalc::ConfigureLogging(const TLoggingOptions& options) {
123+
InitLogging(options);
124+
}
125+
126+
IProgramFactoryPtr NYql::NPureCalc::MakeProgramFactory(const TProgramFactoryOptions& options) {
127+
return new TProgramFactory(options);
128+
}

0 commit comments

Comments
 (0)