Skip to content

Commit 8846b9e

Browse files
Extract members and read actors count opts (#17663)
1 parent 7fb6d22 commit 8846b9e

File tree

23 files changed

+338
-34
lines changed

23 files changed

+338
-34
lines changed

ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@
6262
{"Index": 11, "Name": "DownsamplingAggregation", "Type": "TCoAtom"},
6363
{"Index": 12, "Name": "DownsamplingFill", "Type": "TCoAtom"},
6464
{"Index": 13, "Name": "DownsamplingGridSec", "Type": "TCoUint32"},
65-
{"Index": 14, "Name": "RequiredLabelNames", "Type": "TCoAtomList"}
65+
{"Index": 14, "Name": "RequiredLabelNames", "Type": "TCoAtomList"},
66+
{"Index": 15, "Name": "TotalMetricsCount", "Type": "TCoAtom"}
6667
]
6768
},
6869
{
@@ -85,8 +86,9 @@
8586
{"Index": 3, "Name": "SystemColumns", "Type": "TCoAtomList"},
8687
{"Index": 4, "Name": "LabelNames", "Type": "TCoAtomList"},
8788
{"Index": 5, "Name": "RequiredLabelNames", "Type": "TCoAtomList"},
88-
{"Index": 6, "Name": "RowType", "Type": "TExprBase"},
89-
{"Index": 7, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
89+
{"Index": 6, "Name": "TotalMetricsCount", "Type": "TCoAtom"},
90+
{"Index": 7, "Name": "RowType", "Type": "TExprBase"},
91+
{"Index": 8, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
9092
]
9193
},
9294
{

ydb/library/yql/providers/solomon/provider/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ SRCS(
1111
yql_solomon_dq_integration.cpp
1212
yql_solomon_io_discovery.cpp
1313
yql_solomon_load_meta.cpp
14+
yql_solomon_logical_optimize.cpp
1415
yql_solomon_mkql_compiler.cpp
1516
yql_solomon_physical_optimize.cpp
1617
yql_solomon_provider.cpp

ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class TSolomonDataSink : public TDataProviderBase {
2020
, TypeAnnotationTransformer_(CreateSolomonDataSinkTypeAnnotationTransformer(State_))
2121
, ExecutionTransformer_(CreateSolomonDataSinkExecTransformer(State_))
2222
, PhysicalOptProposalTransformer_(CreateSoPhysicalOptProposalTransformer(State_))
23+
, LogicalOptProposalTransformer_(CreateSolomonLogicalOptProposalTransformer(State_))
2324
{
2425
}
2526

@@ -56,6 +57,10 @@ class TSolomonDataSink : public TDataProviderBase {
5657
return *PhysicalOptProposalTransformer_;
5758
}
5859

60+
IGraphTransformer& GetLogicalOptProposalTransformer() override {
61+
return *LogicalOptProposalTransformer_;
62+
}
63+
5964
bool CanParse(const TExprNode& node) override {
6065
if (node.IsCallable(TCoWrite::CallableName())) {
6166
return TSoDataSink::Match(node.Child(1));
@@ -172,6 +177,7 @@ class TSolomonDataSink : public TDataProviderBase {
172177
THolder<TVisitorTransformerBase> TypeAnnotationTransformer_;
173178
THolder<TExecTransformerBase> ExecutionTransformer_;
174179
THolder<IGraphTransformer> PhysicalOptProposalTransformer_;
180+
THolder<IGraphTransformer> LogicalOptProposalTransformer_;
175181
};
176182

177183

ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,13 @@ class TSolomonDataSource : public TDataProviderBase {
7171
return *ConfigurationTransformer_;
7272
}
7373

74-
IGraphTransformer& GetIODiscoveryTransformer() override {
75-
return *IODiscoveryTransformer_;
76-
}
74+
IGraphTransformer& GetIODiscoveryTransformer() override {
75+
return *IODiscoveryTransformer_;
76+
}
7777

78-
IGraphTransformer& GetLoadTableMetadataTransformer() override {
79-
return *LoadMetaDataTransformer_;
80-
}
78+
IGraphTransformer& GetLoadTableMetadataTransformer() override {
79+
return *LoadMetaDataTransformer_;
80+
}
8181

8282
IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override {
8383
Y_UNUSED(instantOnly);

ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
3737
}
3838

3939
TStatus HandleSoSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
40-
if (!EnsureArgsCount(*input, 15, ctx)) {
40+
if (!EnsureArgsCount(*input, 16, ctx)) {
4141
return TStatus::Error;
4242
}
4343

@@ -129,6 +129,11 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
129129
return TStatus::Error;
130130
}
131131

132+
auto& totalMetricsCount = *input->Child(TSoSourceSettings::idx_TotalMetricsCount);
133+
if (!EnsureAtom(totalMetricsCount, ctx)) {
134+
return TStatus::Error;
135+
}
136+
132137
const auto type = rowType.GetTypeAnn()->Cast<TTypeExprType>()->GetType();
133138
input->SetTypeAnn(ctx.MakeType<TStreamExprType>(type));
134139
return TStatus::Ok;
@@ -149,7 +154,7 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
149154
}
150155

151156
TStatus HandleRead(const TExprNode::TPtr& input, TExprContext& ctx) {
152-
if (!EnsureMinMaxArgsCount(*input, 6U, 7U, ctx)) {
157+
if (!EnsureMinMaxArgsCount(*input, 8U, 9U, ctx)) {
153158
return TStatus::Error;
154159
}
155160

@@ -171,6 +176,16 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
171176
return TStatus::Error;
172177
}
173178

179+
auto& requiredLabelNames = *input->Child(TSoReadObject::idx_RequiredLabelNames);
180+
if (!EnsureTupleOfAtoms(requiredLabelNames, ctx)) {
181+
return TStatus::Error;
182+
}
183+
184+
auto& totalMetricsCount = *input->Child(TSoReadObject::idx_TotalMetricsCount);
185+
if (!EnsureAtom(totalMetricsCount, ctx)) {
186+
return TStatus::Error;
187+
}
188+
174189
const auto& rowType = *input->Child(TSoReadObject::idx_RowType);
175190
if (!EnsureType(rowType, ctx)) {
176191
return TStatus::Error;

ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,13 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
7878
if (const auto maybeSettings = dqSource.Settings().Maybe<TSoSourceSettings>()) {
7979
const auto soSourceSettings = maybeSettings.Cast();
8080
if (!soSourceSettings.Selectors().StringValue().empty()) {
81-
for (size_t i = 0; i < settings.MaxPartitions; ++i) {
81+
ui64 totalMetricsCount;
82+
YQL_ENSURE(TryFromString(soSourceSettings.TotalMetricsCount().StringValue(), totalMetricsCount));
83+
84+
for (size_t i = 0; i < std::min<ui64>(settings.MaxPartitions, totalMetricsCount); ++i) {
8285
partitions.push_back(TStringBuilder() << "partition" << i);
8386
}
87+
8488
return 0;
8589
}
8690
}
@@ -255,6 +259,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
255259
.DownsamplingAggregation<TCoAtom>().Build(downsamplingAggregation ? *downsamplingAggregation : "")
256260
.DownsamplingFill<TCoAtom>().Build(downsamplingFill ? *downsamplingFill : "")
257261
.DownsamplingGridSec<TCoUint32>().Literal().Build(ToString(downsamplingGridSec ? *downsamplingGridSec : 0)).Build()
262+
.TotalMetricsCount(soReadObject.TotalMetricsCount())
258263
.Build()
259264
.DataSource(soReadObject.DataSource().Cast<TCoDataSource>())
260265
.RowType(soReadObject.RowType())
@@ -353,15 +358,18 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
353358
auto computeActorBatchSize = solomonConfig->ComputeActorBatchSize.Get().OrElse(1000);
354359
sourceSettings.insert({"computeActorBatchSize", ToString(computeActorBatchSize)});
355360

356-
if (!source.HasProgram()) {
361+
if (!selectors.empty()) {
362+
ui64 totalMetricsCount;
363+
YQL_ENSURE(TryFromString(settings.TotalMetricsCount(), totalMetricsCount));
364+
357365
auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster));
358366
auto credentialsProvider = providerFactory->CreateProvider();
359367

360368
NDq::TDqSolomonReadParams readParams{ .Source = source };
361369

362370
auto metricsQueueActor = NActors::TActivationContext::ActorSystem()->Register(
363371
NDq::CreateSolomonMetricsQueueActor(
364-
maxTasksPerStage,
372+
std::min<ui64>(maxTasksPerStage, totalMetricsCount),
365373
readParams,
366374
credentialsProvider
367375
),

ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ const TStructExprType* BuildScheme(TPositionHandle pos, const TVector<TCoAtom>&
5858
type = ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Double));
5959
} else if (systemColumn == SOLOMON_SCHEME_LABELS) {
6060
type = ctx.MakeType<NYql::TDictExprType>(stringType, stringType);
61-
} else if (systemColumn = SOLOMON_SCHEME_TYPE) {
61+
} else if (systemColumn == SOLOMON_SCHEME_TYPE) {
6262
type = stringType;
6363
} else {
6464
ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Unknown system column " << systemColumn));
@@ -167,6 +167,7 @@ class TSolomonIODiscoveryTransformer : public TSyncTransformerBase {
167167
.SystemColumns(systemColumnsNode)
168168
.LabelNames(labelNamesNode)
169169
.RequiredLabelNames().Build()
170+
.TotalMetricsCount().Build()
170171
.RowType(rowTypeNode)
171172
.ColumnOrder(std::move(userSchema.back()))
172173
.Done().Ptr()
@@ -177,6 +178,7 @@ class TSolomonIODiscoveryTransformer : public TSyncTransformerBase {
177178
.SystemColumns(systemColumnsNode)
178179
.LabelNames(labelNamesNode)
179180
.RequiredLabelNames().Build()
181+
.TotalMetricsCount().Build()
180182
.RowType(rowTypeNode)
181183
.Done().Ptr();
182184
}

ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ using namespace NNodes;
1313

1414
namespace {
1515

16+
struct TLoadSolomonMetaRequest {
17+
NSo::ISolomonAccessorClient::TPtr SolomonClient;
18+
NThreading::TFuture<NSo::TGetLabelsResponse> LabelNamesRequest;
19+
NThreading::TFuture<NSo::TListMetricsResponse> ListMetricsRequest;
20+
};
21+
1622
TMaybe<TString> ExtractSetting(const TExprNode& settings, const TString& settingName) {
1723
for (size_t i = 0U; i < settings.ChildrenSize(); ++i) {
1824
if (settings.Child(i)->Head().IsAtom(settingName)) {
@@ -49,8 +55,8 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {
4955
return false;
5056
});
5157

52-
std::vector<NThreading::TFuture<NSo::TGetLabelsResponse>> futures;
53-
futures.reserve(nodes.size());
58+
std::vector<NThreading::TFuture<void>> futures;
59+
futures.reserve(nodes.size() * 2);
5460
for (const auto& n : nodes) {
5561
TSoReadObject soReadObject(n);
5662

@@ -76,11 +82,18 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {
7682
auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, State_->Configuration->Tokens.at(clusterName));
7783
auto credentialsProvider = providerFactory->CreateProvider();
7884

79-
SolomonClient_ = NSo::ISolomonAccessorClient::Make(std::move(source), credentialsProvider);
80-
auto future = SolomonClient_->GetLabelNames(selectors);
85+
auto solomonClient = NSo::ISolomonAccessorClient::Make(std::move(source), credentialsProvider);
86+
auto labelNamesFuture = solomonClient->GetLabelNames(selectors);
87+
auto listMetricsFuture = solomonClient->ListMetrics(selectors, 30, 0);
88+
89+
LabelNamesRequests_[soReadObject.Raw()] = {
90+
.SolomonClient = solomonClient,
91+
.LabelNamesRequest = labelNamesFuture,
92+
.ListMetricsRequest = listMetricsFuture
93+
};
8194

82-
LabelNamesRequests_[soReadObject.Raw()] = future;
83-
futures.push_back(future);
95+
futures.push_back(labelNamesFuture.IgnoreResult());
96+
futures.push_back(listMetricsFuture.IgnoreResult());
8497
}
8598
}
8699

@@ -99,22 +112,28 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {
99112
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
100113
AllFuture_.GetValue();
101114

102-
TNodeMap<NThreading::TFuture<NSo::TGetLabelsResponse>> labelNamesRequests;
115+
TNodeMap<TLoadSolomonMetaRequest> labelNamesRequests;
103116
labelNamesRequests.swap(LabelNamesRequests_);
104117

105118
TNodeOnNodeOwnedMap replaces;
106119
for (auto& [node, request] : labelNamesRequests) {
107-
auto value = request.GetValue();
120+
auto labelNamesValue = request.LabelNamesRequest.GetValue();
121+
if (labelNamesValue.Status != NSo::EStatus::STATUS_OK) {
122+
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
123+
TStringBuilder() << "Failed to get label names, details: " << labelNamesValue.Error));
124+
return TStatus::Error;
125+
}
108126

109-
if (value.Status != NSo::EStatus::STATUS_OK) {
127+
auto listMetricsValue = request.ListMetricsRequest.GetValue();
128+
if (listMetricsValue.Status != NSo::EStatus::STATUS_OK) {
110129
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
111-
TStringBuilder() << "Failed to get label names, details: " << value.Error));
130+
TStringBuilder() << "Failed to get total metrics count, details: " << listMetricsValue.Error));
112131
return TStatus::Error;
113132
}
114133

115134
TSoReadObject read(node);
116135
TVector<TCoAtom> labelNames;
117-
for (const auto& label : value.Result.Labels) {
136+
for (const auto& label : labelNamesValue.Result.Labels) {
118137
labelNames.push_back(Build<TCoAtom>(ctx, read.Pos()).Value(label).Done());
119138
}
120139

@@ -124,6 +143,7 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {
124143
.RequiredLabelNames()
125144
.Add(labelNames)
126145
.Build()
146+
.TotalMetricsCount<TCoAtom>().Build(ToString(listMetricsValue.Result.TotalCount))
127147
.Done().Ptr());
128148
}
129149

@@ -137,8 +157,7 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {
137157
TSolomonState::TPtr State_;
138158
NThreading::TFuture<void> AllFuture_;
139159

140-
NSo::ISolomonAccessorClient::TPtr SolomonClient_;
141-
TNodeMap<NThreading::TFuture<NSo::TGetLabelsResponse>> LabelNamesRequests_;
160+
TNodeMap<TLoadSolomonMetaRequest> LabelNamesRequests_;
142161
};
143162

144163
THolder<IGraphTransformer> CreateSolomonLoadTableMetadataTransformer(TSolomonState::TPtr state) {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
2+
#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h>
3+
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
4+
#include <ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h>
5+
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
6+
#include <yql/essentials/providers/common/transform/yql_optimize.h>
7+
8+
namespace NYql {
9+
10+
using namespace NNodes;
11+
12+
namespace {
13+
14+
class TSolomonLogicalOptProposalTransformer: public TOptimizeTransformerBase {
15+
public:
16+
explicit TSolomonLogicalOptProposalTransformer(TSolomonState::TPtr state)
17+
: TOptimizeTransformerBase(state->Types, NLog::EComponent::ProviderGeneric, {})
18+
, State_(state)
19+
{
20+
#define HNDL(name) "LogicalOptimizer-" #name, Hndl(&TSolomonLogicalOptProposalTransformer::name)
21+
AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqSource));
22+
#undef HNDL
23+
}
24+
25+
TMaybeNode<TExprBase> ExtractMembersOverDqSource(TExprBase node, TExprContext& ctx) const {
26+
const auto& extract = node.Cast<TCoExtractMembers>();
27+
const auto& maybeDqSource = extract.Input().Maybe<TDqSourceWrap>();
28+
if (!maybeDqSource) {
29+
return node;
30+
}
31+
32+
const auto& dqSource = maybeDqSource.Cast();
33+
if (dqSource.DataSource().Category() != SolomonProviderName) {
34+
return node;
35+
}
36+
37+
const auto& maybeSoSourceSettings = dqSource.Input().Maybe<TSoSourceSettings>();
38+
if (!maybeSoSourceSettings) {
39+
return node;
40+
}
41+
42+
TSet<TStringBuf> extractMembers;
43+
for (auto member : extract.Members()) {
44+
extractMembers.insert(member.Value());
45+
}
46+
47+
const auto& systemColumns = maybeSoSourceSettings.SystemColumns().Cast();
48+
const auto& labelNames = maybeSoSourceSettings.LabelNames().Cast();
49+
50+
TVector<TCoAtom> newSystemColumns;
51+
TVector<TCoAtom> newLabelNames;
52+
53+
newSystemColumns.reserve(extractMembers.size());
54+
newLabelNames.reserve(extractMembers.size());
55+
56+
for (const auto& atom : systemColumns) {
57+
if (TString column = atom.StringValue(); extractMembers.contains(column)) {
58+
newSystemColumns.push_back(Build<TCoAtom>(ctx, node.Pos()).Value(column).Done());
59+
}
60+
}
61+
62+
for (const auto& atom : labelNames) {
63+
if (TString column = atom.StringValue(); extractMembers.contains(column)) {
64+
newLabelNames.push_back(Build<TCoAtom>(ctx, node.Pos()).Value(column).Done());
65+
}
66+
}
67+
68+
const auto rowType = node.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
69+
70+
TVector<const TItemExprType*> newColumnTypes;
71+
newColumnTypes.reserve(extractMembers.size());
72+
for (const auto& item : rowType->GetItems()) {
73+
if (extractMembers.contains(item->GetName())) {
74+
newColumnTypes.push_back(ctx.MakeType<TItemExprType>(item->GetName(), item->GetItemType()));
75+
}
76+
}
77+
78+
auto newRowTypeNode = ExpandType(node.Pos(), *ctx.MakeType<TStructExprType>(newColumnTypes), ctx);
79+
80+
return Build<TDqSourceWrap>(ctx, dqSource.Pos())
81+
.InitFrom(dqSource)
82+
.Input<TSoSourceSettings>()
83+
.InitFrom(maybeSoSourceSettings.Cast())
84+
.RowType(newRowTypeNode)
85+
.SystemColumns<TCoAtomList>()
86+
.Add(std::move(newSystemColumns))
87+
.Build()
88+
.LabelNames<TCoAtomList>()
89+
.Add(std::move(newLabelNames))
90+
.Build()
91+
.Build()
92+
.RowType(newRowTypeNode)
93+
.Done();
94+
}
95+
96+
private:
97+
const TSolomonState::TPtr State_;
98+
};
99+
100+
} // namespace
101+
102+
THolder<IGraphTransformer> CreateSolomonLogicalOptProposalTransformer(TSolomonState::TPtr state) {
103+
return MakeHolder<TSolomonLogicalOptProposalTransformer>(state);
104+
}
105+
106+
} // namespace NYql

0 commit comments

Comments
 (0)