Skip to content

Commit 7ea8dd3

Browse files
committed
Added multi query plan / ast /statistics support
1 parent 400ce66 commit 7ea8dd3

File tree

6 files changed

+64
-49
lines changed

6 files changed

+64
-49
lines changed

ydb/tests/tools/kqprun/kqprun.cpp

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,15 @@ struct TExecutionOptions {
124124
ythrow yexception() << "Nothing to execute and is not running as daemon";
125125
}
126126

127-
ValidateOptionsSizes();
127+
ValidateOptionsSizes(runnerOptions);
128128
ValidateSchemeQueryOptions(runnerOptions);
129129
ValidateScriptExecutionOptions(runnerOptions);
130130
ValidateAsyncOptions(runnerOptions.YdbSettings.AsyncQueriesSettings);
131131
ValidateTraceOpt(runnerOptions);
132132
}
133133

134134
private:
135-
void ValidateOptionsSizes() const {
135+
void ValidateOptionsSizes(const TRunnerOptions& runnerOptions) const {
136136
const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) {
137137
if (checkSize > numberQueries) {
138138
ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries;
@@ -146,6 +146,10 @@ struct TExecutionOptions {
146146
checker(PoolIds.size(), "pool ids");
147147
checker(UserSIDs.size(), "user SIDs");
148148
checker(Timeouts.size(), "timeouts");
149+
checker(runnerOptions.ScriptQueryAstOutputs.size(), "ast output files");
150+
checker(runnerOptions.ScriptQueryPlanOutputs.size(), "plan output files");
151+
checker(runnerOptions.ScriptQueryTimelineFiles.size(), "timeline files");
152+
checker(runnerOptions.InProgressStatisticsOutputFiles.size(), "statistics files");
149153
}
150154

151155
void ValidateSchemeQueryOptions(const TRunnerOptions& runnerOptions) const {
@@ -180,18 +184,18 @@ struct TExecutionOptions {
180184
if (ResultsRowsLimit) {
181185
ythrow yexception() << "Result rows limit can not be used without script queries";
182186
}
183-
if (runnerOptions.InProgressStatisticsOutputFile) {
187+
if (!runnerOptions.InProgressStatisticsOutputFiles.empty()) {
184188
ythrow yexception() << "Script statistics can not be used without script queries";
185189
}
186190

187191
// Common specific
188192
if (HasExecutionCase(EExecutionCase::YqlScript)) {
189193
return;
190194
}
191-
if (runnerOptions.ScriptQueryAstOutput) {
195+
if (!runnerOptions.ScriptQueryAstOutputs.empty()) {
192196
ythrow yexception() << "Script query AST output can not be used without script/yql queries";
193197
}
194-
if (runnerOptions.ScriptQueryPlanOutput) {
198+
if (!runnerOptions.ScriptQueryPlanOutputs.empty()) {
195199
ythrow yexception() << "Script query plan output can not be used without script/yql queries";
196200
}
197201
if (runnerOptions.YdbSettings.SameSession) {
@@ -250,14 +254,6 @@ struct TExecutionOptions {
250254
}
251255

252256
private:
253-
template <typename TValue>
254-
static TValue GetValue(size_t index, const std::vector<TValue>& values, TValue defaultValue) {
255-
if (values.empty()) {
256-
return defaultValue;
257-
}
258-
return values[std::min(index, values.size() - 1)];
259-
}
260-
261257
static void ReplaceYqlTokenTemplate(TString& sql) {
262258
const TString variableName = TStringBuilder() << "${" << YQL_TOKEN_VARIABLE << "}";
263259
if (const TString& yqlToken = GetEnv(YQL_TOKEN_VARIABLE)) {
@@ -591,18 +587,23 @@ class TMain : public TMainClassArgs {
591587

592588
options.AddLongOption("script-ast-file", "File with script query ast (use '-' to write in stdout)")
593589
.RequiredArgument("file")
594-
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryAstOutput, &GetDefaultOutput);
590+
.Handler1([this](const NLastGetopt::TOptsParser* option) {
591+
RunnerOptions.ScriptQueryAstOutputs.emplace_back(GetDefaultOutput(TString(option->CurValOrDef())));
592+
});
595593

596594
options.AddLongOption("script-plan-file", "File with script query plan (use '-' to write in stdout)")
597595
.RequiredArgument("file")
598-
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput);
596+
.Handler1([this](const NLastGetopt::TOptsParser* option) {
597+
RunnerOptions.ScriptQueryPlanOutputs.emplace_back(GetDefaultOutput(TString(option->CurValOrDef())));
598+
});
599599
options.AddLongOption("script-statistics", "File with script inprogress statistics")
600600
.RequiredArgument("file")
601-
.StoreMappedResultT<TString>(&RunnerOptions.InProgressStatisticsOutputFile, [](const TString& file) {
601+
.Handler1([this](const NLastGetopt::TOptsParser* option) {
602+
const TString file(option->CurValOrDef());
602603
if (file == "-") {
603604
ythrow yexception() << "Script in progress statistics cannot be printed to stdout, please specify file name";
604605
}
605-
return file;
606+
RunnerOptions.InProgressStatisticsOutputFiles.emplace_back(file);
606607
});
607608
TChoices<NYdb::NConsoleClient::EDataFormat> planFormat({
608609
{"pretty", NYdb::NConsoleClient::EDataFormat::Pretty},
@@ -617,11 +618,12 @@ class TMain : public TMainClassArgs {
617618

618619
options.AddLongOption("script-timeline-file", "File with script query timline in svg format")
619620
.RequiredArgument("file")
620-
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryTimelineFile, [](const TString& file) {
621+
.Handler1([this](const NLastGetopt::TOptsParser* option) {
622+
const TString file(option->CurValOrDef());
621623
if (file == "-") {
622624
ythrow yexception() << "Script timline cannot be printed to stdout, please specify file name";
623625
}
624-
return file;
626+
RunnerOptions.ScriptQueryTimelineFiles.emplace_back(file);
625627
});
626628

627629
// Pipeline settings

ydb/tests/tools/kqprun/src/actors.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
1414
public:
1515
TRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback)
1616
: TargetNode_(request.TargetNode)
17+
, QueryId_(request.QueryId)
1718
, Request_(std::move(request.Event))
1819
, Promise_(promise)
1920
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
@@ -83,12 +84,14 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
8384

8485
void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
8586
if (ProgressCallback_) {
86-
ProgressCallback_(ev->Get()->Record);
87+
ProgressCallback_(QueryId_, ev->Get()->Record);
8788
}
8889
}
8990

9091
private:
91-
ui32 TargetNode_ = 0;
92+
const ui32 TargetNode_ = 0;
93+
const size_t QueryId_ = 0;
94+
9295
std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
9396
NThreading::TPromise<TQueryResponse> Promise_;
9497
ui64 ResultRowsLimit_;

ydb/tests/tools/kqprun/src/actors.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ struct TQueryRequest {
1818
ui32 TargetNode;
1919
ui64 ResultRowsLimit;
2020
ui64 ResultSizeLimit;
21+
size_t QueryId;
2122
};
2223

2324
struct TCreateSessionRequest {
@@ -75,7 +76,7 @@ struct TEvPrivate {
7576
};
7677
};
7778

78-
using TProgressCallback = std::function<void(const NKikimrKqp::TEvExecuterProgress&)>;
79+
using TProgressCallback = std::function<void(ui64 queryId, const NKikimrKqp::TEvExecuterProgress& executerProgress)>;
7980

8081
NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback);
8182

ydb/tests/tools/kqprun/src/common.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ struct TRunnerOptions {
7272

7373
IOutputStream* ResultOutput = nullptr;
7474
IOutputStream* SchemeQueryAstOutput = nullptr;
75-
IOutputStream* ScriptQueryAstOutput = nullptr;
76-
IOutputStream* ScriptQueryPlanOutput = nullptr;
77-
TString ScriptQueryTimelineFile;
78-
TString InProgressStatisticsOutputFile;
75+
std::vector<IOutputStream*> ScriptQueryAstOutputs;
76+
std::vector<IOutputStream*> ScriptQueryPlanOutputs;
77+
std::vector<TString> ScriptQueryTimelineFiles;
78+
std::vector<TString> InProgressStatisticsOutputFiles;
7979

8080
EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;
8181
NYdb::NConsoleClient::EDataFormat PlanOutputFormat = NYdb::NConsoleClient::EDataFormat::Default;
@@ -99,4 +99,12 @@ struct TRequestOptions {
9999
size_t QueryId = 0;
100100
};
101101

102+
template <typename TValue>
103+
static TValue GetValue(size_t index, const std::vector<TValue>& values, TValue defaultValue) {
104+
if (values.empty()) {
105+
return defaultValue;
106+
}
107+
return values[std::min(index, values.size() - 1)];
108+
}
109+
102110
} // namespace NKqpRun

ydb/tests/tools/kqprun/src/kqp_runner.cpp

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class TKqpRunner::TImpl {
132132
ExecutionMeta_ = TExecutionMeta();
133133
ExecutionMeta_.Database = script.Database;
134134

135-
return WaitScriptExecutionOperation();
135+
return WaitScriptExecutionOperation(script.QueryId);
136136
}
137137

138138
bool ExecuteQuery(const TRequestOptions& query, EQueryType queryType) {
@@ -164,9 +164,9 @@ class TKqpRunner::TImpl {
164164
meta.Plan = ExecutionMeta_.Plan;
165165
}
166166

167-
PrintScriptAst(meta.Ast);
168-
PrintScriptProgress(meta.Plan);
169-
PrintScriptPlan(meta.Plan);
167+
PrintScriptAst(query.QueryId, meta.Ast);
168+
PrintScriptProgress(query.QueryId, meta.Plan);
169+
PrintScriptPlan(query.QueryId, meta.Plan);
170170
PrintScriptFinish(meta, queryTypeStr);
171171

172172
if (!status.IsSuccess()) {
@@ -233,7 +233,7 @@ class TKqpRunner::TImpl {
233233
}
234234

235235
private:
236-
bool WaitScriptExecutionOperation() {
236+
bool WaitScriptExecutionOperation(ui64 queryId) {
237237
StartTime_ = TInstant::Now();
238238

239239
TDuration getOperationPeriod = TDuration::Seconds(1);
@@ -244,7 +244,7 @@ class TKqpRunner::TImpl {
244244
TRequestResult status;
245245
while (true) {
246246
status = YdbSetup_.GetScriptExecutionOperationRequest(ExecutionMeta_.Database, ExecutionOperation_, ExecutionMeta_);
247-
PrintScriptProgress(ExecutionMeta_.Plan);
247+
PrintScriptProgress(queryId, ExecutionMeta_.Plan);
248248

249249
if (ExecutionMeta_.Ready) {
250250
break;
@@ -269,9 +269,9 @@ class TKqpRunner::TImpl {
269269

270270
TYdbSetup::StopTraceOpt();
271271

272-
PrintScriptAst(ExecutionMeta_.Ast);
273-
PrintScriptProgress(ExecutionMeta_.Plan);
274-
PrintScriptPlan(ExecutionMeta_.Plan);
272+
PrintScriptAst(queryId, ExecutionMeta_.Ast);
273+
PrintScriptProgress(queryId, ExecutionMeta_.Plan);
274+
PrintScriptPlan(queryId, ExecutionMeta_.Plan);
275275
PrintScriptFinish(ExecutionMeta_, "Script");
276276

277277
if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) {
@@ -311,10 +311,10 @@ class TKqpRunner::TImpl {
311311
}
312312
}
313313

314-
void PrintScriptAst(const TString& ast) const {
315-
if (Options_.ScriptQueryAstOutput) {
314+
void PrintScriptAst(size_t queryId, const TString& ast) const {
315+
if (const auto output = GetValue<IOutputStream*>(queryId, Options_.ScriptQueryAstOutputs, nullptr)) {
316316
Cout << CoutColors_.Cyan() << "Writing script query ast" << CoutColors_.Default() << Endl;
317-
Options_.ScriptQueryAstOutput->Write(ast);
317+
output->Write(ast);
318318
}
319319
}
320320

@@ -333,16 +333,16 @@ class TKqpRunner::TImpl {
333333
printer.Print(plan);
334334
}
335335

336-
void PrintScriptPlan(const TString& plan) const {
337-
if (Options_.ScriptQueryPlanOutput) {
336+
void PrintScriptPlan(size_t queryId, const TString& plan) const {
337+
if (const auto output = GetValue<IOutputStream*>(queryId, Options_.ScriptQueryPlanOutputs, nullptr)) {
338338
Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl;
339-
PrintPlan(plan, Options_.ScriptQueryPlanOutput);
339+
PrintPlan(plan, output);
340340
}
341341
}
342342

343-
void PrintScriptProgress(const TString& plan) const {
344-
if (Options_.InProgressStatisticsOutputFile) {
345-
TFileOutput outputStream(Options_.InProgressStatisticsOutputFile);
343+
void PrintScriptProgress(size_t queryId, const TString& plan) const {
344+
if (const auto& output = GetValue<TString>(queryId, Options_.InProgressStatisticsOutputFiles, {})) {
345+
TFileOutput outputStream(output);
346346
outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl;
347347

348348
auto convertedPlan = plan;
@@ -369,8 +369,8 @@ class TKqpRunner::TImpl {
369369

370370
outputStream.Finish();
371371
}
372-
if (Options_.ScriptQueryTimelineFile) {
373-
TFileOutput outputStream(Options_.ScriptQueryTimelineFile);
372+
if (const auto& output = GetValue<TString>(queryId, Options_.ScriptQueryTimelineFiles, {})) {
373+
TFileOutput outputStream(output);
374374

375375
TPlanVisualizer planVisualizer;
376376
planVisualizer.LoadPlans(plan);
@@ -381,10 +381,10 @@ class TKqpRunner::TImpl {
381381
}
382382

383383
TProgressCallback GetProgressCallback() {
384-
return [this](const NKikimrKqp::TEvExecuterProgress& executerProgress) mutable {
384+
return [this](ui64 queryId, const NKikimrKqp::TEvExecuterProgress& executerProgress) mutable {
385385
const TString& plan = executerProgress.GetQueryPlan();
386386
ExecutionMeta_.Plan = plan;
387-
PrintScriptProgress(plan);
387+
PrintScriptProgress(queryId, plan);
388388
};
389389
}
390390

ydb/tests/tools/kqprun/src/ydb_setup.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,8 @@ class TYdbSetup::TImpl {
535535
.Event = std::move(event),
536536
.TargetNode = GetRuntime()->GetNodeId(targetNodeIndex),
537537
.ResultRowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(),
538-
.ResultSizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit()
538+
.ResultSizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(),
539+
.QueryId = query.QueryId
539540
};
540541
}
541542

0 commit comments

Comments
 (0)