Skip to content

Commit 2a27bbb

Browse files
committed
Passed unboxed values from filter to read actor
1 parent 5b45c74 commit 2a27bbb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3890
-2044
lines changed

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "common.h"
3+
#include "format_handler/common/common.h"
44

55
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
66
#include <util/generic/ptr.h>

ydb/core/fq/libs/row_dispatcher/common.cpp

Lines changed: 0 additions & 42 deletions
This file was deleted.

ydb/core/fq/libs/row_dispatcher/common.h

Lines changed: 0 additions & 25 deletions
This file was deleted.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#include "common.h"
2+
3+
#include <util/string/builder.h>
4+
5+
#include <ydb/library/yql/dq/actors/dq.h>
6+
#include <ydb/library/yql/public/purecalc/common/interface.h>
7+
8+
namespace NFq::NRowDispatcher {
9+
10+
namespace {
11+
12+
class TPureCalcProgramFactory : public IPureCalcProgramFactory {
13+
public:
14+
TPureCalcProgramFactory() {
15+
CreateFactory({.EnabledLLVM = false});
16+
CreateFactory({.EnabledLLVM = true});
17+
}
18+
19+
NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override {
20+
const auto it = ProgramFactories.find(settings);
21+
Y_ENSURE(it != ProgramFactories.end());
22+
return it->second;
23+
}
24+
25+
TGuard<TMutex> LockFactory() const override {
26+
return Guard(FactoryMutex);
27+
}
28+
29+
private:
30+
void CreateFactory(const TSettings& settings) {
31+
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
32+
NYql::NPureCalc::TProgramFactoryOptions()
33+
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
34+
)});
35+
}
36+
37+
private:
38+
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
39+
TMutex FactoryMutex;
40+
};
41+
42+
} // anonymous namespace
43+
44+
//// TStatus
45+
46+
TStatus::TStatus()
47+
: Status(EId::SUCCESS)
48+
{}
49+
50+
TStatus::TStatus(TStatucCode status, NYql::TIssues issues)
51+
: Status(status)
52+
, Issues(std::move(issues))
53+
{}
54+
55+
TStatus::TStatus(TStatucCode status, TString message)
56+
: Status(status)
57+
, Issues({NYql::TIssue(std::move(message))})
58+
{}
59+
60+
bool TStatus::IsSuccess() const {
61+
return Status == EId::SUCCESS;
62+
}
63+
64+
TStatus::TStatucCode TStatus::GetStatus() const {
65+
return Status;
66+
}
67+
68+
const NYql::TIssues& TStatus::GetIssues() const {
69+
return Issues;
70+
}
71+
72+
TString TStatus::ToString() const {
73+
return TStringBuilder() << "Status: " << NYql::NDq::DqStatusToYdbStatus(Status) << ", Issues: " << Issues.ToOneLineString();
74+
}
75+
76+
TStatus& TStatus::AddParentIssue(NYql::TIssue issue) {
77+
for (const auto& childIssue : Issues) {
78+
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(childIssue));
79+
}
80+
Issues = {std::move(issue)};
81+
return *this;
82+
}
83+
84+
TStatus& TStatus::AddParentIssue(TString message) {
85+
return AddParentIssue(NYql::TIssue(std::move(message)));
86+
}
87+
88+
TStatus& TStatus::AddIssue(NYql::TIssue issue) {
89+
Issues.AddIssue(std::move(issue));
90+
return *this;
91+
}
92+
93+
TStatus& TStatus::AddIssue(TString message) {
94+
return AddIssue(NYql::TIssue{std::move(message)});
95+
}
96+
97+
//// IPureCalcProgramFactory
98+
99+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
100+
return MakeIntrusive<TPureCalcProgramFactory>();
101+
}
102+
103+
} // namespace NFq::NRowDispatcher
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#pragma once
2+
3+
#include <util/system/mutex.h>
4+
5+
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
6+
#include <ydb/library/yql/public/purecalc/common/fwd.h>
7+
8+
#include <yql/essentials/public/issue/yql_issue.h>
9+
10+
namespace NFq::NRowDispatcher {
11+
12+
struct TFormatHandlerException : public yexception {
13+
using yexception::yexception;
14+
};
15+
16+
class TStatus {
17+
public:
18+
using EId = NYql::NDqProto::StatusIds;
19+
using TStatucCode = EId::StatusCode;
20+
21+
public:
22+
TStatus();
23+
explicit TStatus(TStatucCode status, NYql::TIssues issues = {});
24+
TStatus(TStatucCode status, TString message);
25+
26+
virtual bool IsSuccess() const;
27+
28+
TStatucCode GetStatus() const;
29+
const NYql::TIssues& GetIssues() const;
30+
31+
TString ToString() const;
32+
33+
TStatus& AddParentIssue(NYql::TIssue issue);
34+
TStatus& AddParentIssue(TString message);
35+
36+
TStatus& AddIssue(NYql::TIssue issue);
37+
TStatus& AddIssue(TString message);
38+
39+
protected:
40+
TStatucCode Status;
41+
NYql::TIssues Issues;
42+
};
43+
44+
template <typename TValue>
45+
class TValueStatus : public TStatus {
46+
using TBase = TStatus;
47+
48+
public:
49+
TValueStatus(TValue value)
50+
: TBase(EId::SUCCESS)
51+
, Value(std::move(value))
52+
{}
53+
54+
TValueStatus(TStatus status)
55+
: TBase(std::move(status))
56+
{}
57+
58+
bool IsSuccess() const override {
59+
return TBase::IsSuccess() && Value;
60+
}
61+
62+
TValue& GetValue() {
63+
if (Y_UNLIKELY(!Value)) {
64+
throw TFormatHandlerException() << "Internal error, failed to get value, " << ToString();
65+
}
66+
return *Value;
67+
}
68+
69+
private:
70+
std::optional<TValue> Value;
71+
};
72+
73+
struct TSchemaColumn {
74+
TString Name;
75+
TString TypeYson;
76+
77+
bool operator==(const TSchemaColumn& other) const = default;
78+
};
79+
80+
class IPureCalcProgramFactory : public TThrRefBase {
81+
public:
82+
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;
83+
84+
struct TSettings {
85+
bool EnabledLLVM = false;
86+
87+
std::strong_ordering operator<=>(const TSettings& other) const = default;
88+
};
89+
90+
public:
91+
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;
92+
93+
// Before creating purecalc program factory should be locked
94+
virtual TGuard<TMutex> LockFactory() const = 0;
95+
};
96+
97+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();
98+
99+
} // namespace NFq::NRowDispatcher
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
common.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper
9+
10+
ydb/library/yql/dq/actors
11+
ydb/library/yql/dq/actors/protos
12+
13+
yql/essentials/public/issue
14+
)
15+
16+
YQL_LAST_ABI_VERSION()
17+
18+
END()

0 commit comments

Comments
 (0)