Skip to content

Commit 8a696cb

Browse files
committed
Passed unboxed values from filter to read actor
1 parent 423f428 commit 8a696cb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3894
-2057
lines changed

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "common.h"
3+
#include "format_handler/common/common.h"
44

55
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
66
#include <util/generic/ptr.h>

ydb/core/fq/libs/row_dispatcher/common.cpp

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,9 @@
11
#include "common.h"
22

3-
#include <util/system/mutex.h>
4-
5-
#include <yql/essentials/public/purecalc/common/interface.h>
3+
#include <util/generic/string.h>
64

75
namespace NFq {
86

9-
namespace {
10-
11-
class TPureCalcProgramFactory : public IPureCalcProgramFactory {
12-
public:
13-
TPureCalcProgramFactory() {
14-
CreateFactory({.EnabledLLVM = false});
15-
CreateFactory({.EnabledLLVM = true});
16-
}
17-
18-
NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override {
19-
const auto it = ProgramFactories.find(settings);
20-
Y_ENSURE(it != ProgramFactories.end());
21-
return it->second;
22-
}
23-
24-
TGuard<TMutex> LockFactory() const override {
25-
return Guard(FactoryMutex);
26-
}
27-
28-
private:
29-
void CreateFactory(const TSettings& settings) {
30-
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
31-
NYql::NPureCalc::TProgramFactoryOptions()
32-
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
33-
)});
34-
}
35-
36-
private:
37-
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
38-
TMutex FactoryMutex;
39-
};
40-
41-
} // anonymous namespace
42-
43-
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
44-
return MakeIntrusive<TPureCalcProgramFactory>();
45-
}
46-
477
TString CleanupCounterValueString(const TString& value) {
488
TString clean;
499
constexpr auto valueLenghtLimit = 200;
Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,9 @@
11
#pragma once
22

3-
#include <util/generic/ptr.h>
4-
#include <util/system/mutex.h>
5-
6-
#include <yql/essentials/public/purecalc/common/fwd.h>
3+
#include <util/generic/fwd.h>
74

85
namespace NFq {
96

10-
class IPureCalcProgramFactory : public TThrRefBase {
11-
public:
12-
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;
13-
14-
struct TSettings {
15-
bool EnabledLLVM = false;
16-
17-
std::strong_ordering operator<=>(const TSettings& other) const = default;
18-
};
19-
20-
public:
21-
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;
22-
23-
// Before creating purecalc program factory should be locked
24-
virtual TGuard<TMutex> LockFactory() const = 0;
25-
};
26-
27-
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();
28-
297
TString CleanupCounterValueString(const TString& value);
308

319
} // namespace NFq
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
#include "common.h"
2+
3+
#include <util/string/builder.h>
4+
5+
#include <ydb/library/yql/dq/actors/dq.h>
6+
7+
#include <yql/essentials/public/purecalc/common/interface.h>
8+
9+
namespace NFq::NRowDispatcher {
10+
11+
namespace {
12+
13+
class TPureCalcProgramFactory : public IPureCalcProgramFactory {
14+
public:
15+
TPureCalcProgramFactory() {
16+
CreateFactory({.EnabledLLVM = false});
17+
CreateFactory({.EnabledLLVM = true});
18+
}
19+
20+
NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override {
21+
const auto it = ProgramFactories.find(settings);
22+
Y_ENSURE(it != ProgramFactories.end());
23+
return it->second;
24+
}
25+
26+
TGuard<TMutex> LockFactory() const override {
27+
return Guard(FactoryMutex);
28+
}
29+
30+
private:
31+
void CreateFactory(const TSettings& settings) {
32+
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
33+
NYql::NPureCalc::TProgramFactoryOptions()
34+
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
35+
)});
36+
}
37+
38+
private:
39+
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
40+
TMutex FactoryMutex;
41+
};
42+
43+
} // anonymous namespace
44+
45+
//// TStatus
46+
47+
TStatus::TStatus()
48+
: Status(EId::SUCCESS)
49+
{}
50+
51+
TStatus::TStatus(TStatucCode status, NYql::TIssues issues)
52+
: Status(status)
53+
, Issues(std::move(issues))
54+
{}
55+
56+
TStatus::TStatus(TStatucCode status, TString message)
57+
: Status(status)
58+
, Issues({NYql::TIssue(std::move(message))})
59+
{}
60+
61+
bool TStatus::IsSuccess() const {
62+
return Status == EId::SUCCESS;
63+
}
64+
65+
TStatus::TStatucCode TStatus::GetStatus() const {
66+
return Status;
67+
}
68+
69+
const NYql::TIssues& TStatus::GetIssues() const {
70+
return Issues;
71+
}
72+
73+
TString TStatus::ToString() const {
74+
return TStringBuilder() << "Status: " << NYql::NDq::DqStatusToYdbStatus(Status) << ", Issues: " << Issues.ToOneLineString();
75+
}
76+
77+
TStatus& TStatus::AddParentIssue(NYql::TIssue issue) {
78+
for (const auto& childIssue : Issues) {
79+
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(childIssue));
80+
}
81+
Issues = {std::move(issue)};
82+
return *this;
83+
}
84+
85+
TStatus& TStatus::AddParentIssue(TString message) {
86+
return AddParentIssue(NYql::TIssue(std::move(message)));
87+
}
88+
89+
TStatus& TStatus::AddIssue(NYql::TIssue issue) {
90+
Issues.AddIssue(std::move(issue));
91+
return *this;
92+
}
93+
94+
TStatus& TStatus::AddIssue(TString message) {
95+
return AddIssue(NYql::TIssue{std::move(message)});
96+
}
97+
98+
//// IPureCalcProgramFactory
99+
100+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
101+
return MakeIntrusive<TPureCalcProgramFactory>();
102+
}
103+
104+
} // namespace NFq::NRowDispatcher
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#pragma once
2+
3+
#include <util/system/mutex.h>
4+
5+
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
6+
7+
#include <yql/essentials/public/issue/yql_issue.h>
8+
#include <yql/essentials/public/purecalc/common/fwd.h>
9+
10+
namespace NFq::NRowDispatcher {
11+
12+
struct TFormatHandlerException : public yexception {
13+
using yexception::yexception;
14+
};
15+
16+
class TStatus {
17+
public:
18+
using EId = NYql::NDqProto::StatusIds;
19+
using TStatucCode = EId::StatusCode;
20+
21+
public:
22+
TStatus();
23+
explicit TStatus(TStatucCode status, NYql::TIssues issues = {});
24+
TStatus(TStatucCode status, TString message);
25+
26+
virtual bool IsSuccess() const;
27+
28+
TStatucCode GetStatus() const;
29+
const NYql::TIssues& GetIssues() const;
30+
31+
TString ToString() const;
32+
33+
TStatus& AddParentIssue(NYql::TIssue issue);
34+
TStatus& AddParentIssue(TString message);
35+
36+
TStatus& AddIssue(NYql::TIssue issue);
37+
TStatus& AddIssue(TString message);
38+
39+
protected:
40+
TStatucCode Status;
41+
NYql::TIssues Issues;
42+
};
43+
44+
template <typename TValue>
45+
class TValueStatus : public TStatus {
46+
using TBase = TStatus;
47+
48+
public:
49+
TValueStatus(TValue value)
50+
: TBase(EId::SUCCESS)
51+
, Value(std::move(value))
52+
{}
53+
54+
TValueStatus(TStatus status)
55+
: TBase(std::move(status))
56+
{}
57+
58+
bool IsSuccess() const override {
59+
return TBase::IsSuccess() && Value;
60+
}
61+
62+
TValue& GetValue() {
63+
if (Y_UNLIKELY(!Value)) {
64+
throw TFormatHandlerException() << "Internal error, failed to get value, " << ToString();
65+
}
66+
return *Value;
67+
}
68+
69+
private:
70+
std::optional<TValue> Value;
71+
};
72+
73+
struct TSchemaColumn {
74+
TString Name;
75+
TString TypeYson;
76+
77+
bool operator==(const TSchemaColumn& other) const = default;
78+
};
79+
80+
class IPureCalcProgramFactory : public TThrRefBase {
81+
public:
82+
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;
83+
84+
struct TSettings {
85+
bool EnabledLLVM = false;
86+
87+
std::strong_ordering operator<=>(const TSettings& other) const = default;
88+
};
89+
90+
public:
91+
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;
92+
93+
// Before creating purecalc program factory should be locked
94+
virtual TGuard<TMutex> LockFactory() const = 0;
95+
};
96+
97+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();
98+
99+
} // namespace NFq::NRowDispatcher
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
common.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper
9+
10+
ydb/library/yql/dq/actors
11+
ydb/library/yql/dq/actors/protos
12+
13+
yql/essentials/public/issue
14+
)
15+
16+
YQL_LAST_ABI_VERSION()
17+
18+
END()

0 commit comments

Comments
 (0)