Skip to content

Commit 96cb4ca

Browse files
authored
YQ kqprun fixed AS pools and added validations (#9030)
1 parent 79360ac commit 96cb4ca

File tree

9 files changed

+301
-63
lines changed

9 files changed

+301
-63
lines changed

ydb/core/testlib/test_client.cpp

Lines changed: 39 additions & 34 deletions
Large diffs are not rendered by default.

ydb/core/testlib/test_client.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,8 @@ namespace Tests {
295295
TServer& operator =(TServer&& server) = default;
296296
virtual ~TServer();
297297

298-
void EnableGRpc(const NYdbGrpc::TServerOptions& options);
299-
void EnableGRpc(ui16 port);
298+
void EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId = 0);
299+
void EnableGRpc(ui16 port, ui32 grpcServiceNodeId = 0);
300300
void SetupRootStoragePools(const TActorId sender) const;
301301

302302
void SetupDefaultProfiles();

ydb/tests/tools/kqprun/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,5 @@ udfs
66
*.sql
77
*.bin
88
*.txt
9+
*.svg
10+
*.old

ydb/tests/tools/kqprun/configuration/app_config.conf

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,49 @@
1+
ActorSystemConfig {
2+
Executor {
3+
Type: BASIC
4+
Threads: 1
5+
SpinThreshold: 10
6+
Name: "System"
7+
}
8+
Executor {
9+
Type: BASIC
10+
Threads: 6
11+
SpinThreshold: 1
12+
Name: "User"
13+
}
14+
Executor {
15+
Type: BASIC
16+
Threads: 1
17+
SpinThreshold: 1
18+
Name: "Batch"
19+
}
20+
Executor {
21+
Type: IO
22+
Threads: 1
23+
Name: "IO"
24+
}
25+
Executor {
26+
Type: BASIC
27+
Threads: 2
28+
SpinThreshold: 10
29+
Name: "IC"
30+
TimePerMailboxMicroSecs: 100
31+
}
32+
Scheduler {
33+
Resolution: 64
34+
SpinThreshold: 0
35+
ProgressThreshold: 10000
36+
}
37+
SysExecutor: 0
38+
UserExecutor: 1
39+
IoExecutor: 3
40+
BatchExecutor: 2
41+
ServiceExecutor {
42+
ServiceName: "Interconnect"
43+
ExecutorId: 4
44+
}
45+
}
46+
147
ColumnShardConfig {
248
DisabledOnSchemeShard: false
349
}

ydb/tests/tools/kqprun/flame_graph.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/usr/bin/env bash
2+
3+
# For svg graph download https://github.com/brendangregg/FlameGraph
4+
# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg`
5+
6+
pid=$(pgrep -u $USER kqprun)
7+
8+
echo "Target process id: ${pid}"
9+
10+
sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30
11+
sudo perf script -i profdata > profdata.txt

ydb/tests/tools/kqprun/kqprun.cpp

Lines changed: 161 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,25 +44,29 @@ struct TExecutionOptions {
4444
std::vector<TString> TraceIds;
4545
std::vector<TString> PoolIds;
4646
std::vector<TString> UserSIDs;
47+
ui64 ResultsRowsLimit = 0;
4748

4849
const TString DefaultTraceId = "kqprun";
4950

5051
bool HasResults() const {
51-
if (ScriptQueries.empty()) {
52-
return false;
53-
}
54-
55-
for (size_t i = 0; i < ExecutionCases.size(); ++i) {
52+
for (size_t i = 0; i < ScriptQueries.size(); ++i) {
5653
if (GetScriptQueryAction(i) != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) {
5754
continue;
5855
}
59-
if (ExecutionCases[i] != EExecutionCase::AsyncQuery) {
56+
if (GetExecutionCase(i) != EExecutionCase::AsyncQuery) {
6057
return true;
6158
}
6259
}
6360
return false;
6461
}
6562

63+
bool HasExecutionCase(EExecutionCase executionCase) const {
64+
if (ExecutionCases.empty()) {
65+
return executionCase == EExecutionCase::GenericScript;
66+
}
67+
return std::find(ExecutionCases.begin(), ExecutionCases.end(), executionCase) != ExecutionCases.end();
68+
}
69+
6670
EExecutionCase GetExecutionCase(size_t index) const {
6771
return GetValue(index, ExecutionCases, EExecutionCase::GenericScript);
6872
}
@@ -106,6 +110,113 @@ struct TExecutionOptions {
106110
};
107111
}
108112

113+
void Validate(const NKqpRun::TRunnerOptions& runnerOptions) const {
114+
if (!SchemeQuery && ScriptQueries.empty() && !runnerOptions.YdbSettings.MonitoringEnabled && !runnerOptions.YdbSettings.GrpcEnabled) {
115+
ythrow yexception() << "Nothing to execute and is not running as daemon";
116+
}
117+
118+
ValidateOptionsSizes();
119+
ValidateSchemeQueryOptions(runnerOptions);
120+
ValidateScriptExecutionOptions(runnerOptions);
121+
ValidateAsyncOptions(runnerOptions.YdbSettings.AsyncQueriesSettings);
122+
ValidateTraceOpt(runnerOptions.TraceOptType);
123+
}
124+
125+
private:
126+
void ValidateOptionsSizes() const {
127+
const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) {
128+
if (checkSize > numberQueries) {
129+
ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries;
130+
}
131+
};
132+
133+
checker(ExecutionCases.size(), "execution cases");
134+
checker(ScriptQueryActions.size(), "script query actions");
135+
checker(Databases.size(), "databases");
136+
checker(TraceIds.size(), "trace ids");
137+
checker(PoolIds.size(), "pool ids");
138+
checker(UserSIDs.size(), "user SIDs");
139+
}
140+
141+
void ValidateSchemeQueryOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
142+
if (SchemeQuery) {
143+
return;
144+
}
145+
if (runnerOptions.SchemeQueryAstOutput) {
146+
ythrow yexception() << "Scheme query AST output can not be used without scheme query";
147+
}
148+
}
149+
150+
void ValidateScriptExecutionOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
151+
// Script specific
152+
if (HasExecutionCase(EExecutionCase::GenericScript)) {
153+
return;
154+
}
155+
if (ForgetExecution) {
156+
ythrow yexception() << "Forget execution can not be used without generic script queries";
157+
}
158+
if (runnerOptions.ScriptCancelAfter) {
159+
ythrow yexception() << "Cancel after can not be used without generic script queries";
160+
}
161+
162+
// Script/Query specific
163+
if (HasExecutionCase(EExecutionCase::GenericQuery)) {
164+
return;
165+
}
166+
if (ResultsRowsLimit) {
167+
ythrow yexception() << "Result rows limit can not be used without script queries";
168+
}
169+
if (runnerOptions.InProgressStatisticsOutputFile) {
170+
ythrow yexception() << "Script statistics can not be used without script queries";
171+
}
172+
173+
// Common specific
174+
if (HasExecutionCase(EExecutionCase::YqlScript)) {
175+
return;
176+
}
177+
if (runnerOptions.ScriptQueryAstOutput) {
178+
ythrow yexception() << "Script query AST output can not be used without script/yql queries";
179+
}
180+
if (runnerOptions.ScriptQueryPlanOutput) {
181+
ythrow yexception() << "Script query plan output can not be used without script/yql queries";
182+
}
183+
}
184+
185+
void ValidateAsyncOptions(const NKqpRun::TAsyncQueriesSettings& asyncQueriesSettings) const {
186+
if (asyncQueriesSettings.InFlightLimit && !HasExecutionCase(EExecutionCase::AsyncQuery)) {
187+
ythrow yexception() << "In flight limit can not be used without async queries";
188+
}
189+
190+
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
191+
if (LoopCount && asyncQueriesSettings.InFlightLimit && asyncQueriesSettings.InFlightLimit > ScriptQueries.size() * LoopCount) {
192+
Cout << colors.Red() << "Warning: inflight limit is " << asyncQueriesSettings.InFlightLimit << ", that is larger than max possible number of queries " << ScriptQueries.size() * LoopCount << colors.Default() << Endl;
193+
}
194+
}
195+
196+
void ValidateTraceOpt(NKqpRun::TRunnerOptions::ETraceOptType traceOptType) const {
197+
switch (traceOptType) {
198+
case NKqpRun::TRunnerOptions::ETraceOptType::Scheme: {
199+
if (!SchemeQuery) {
200+
ythrow yexception() << "Trace opt type scheme cannot be used without scheme query";
201+
}
202+
break;
203+
}
204+
case NKqpRun::TRunnerOptions::ETraceOptType::Script: {
205+
if (ScriptQueries.empty()) {
206+
ythrow yexception() << "Trace opt type script cannot be used without script queries";
207+
}
208+
}
209+
case NKqpRun::TRunnerOptions::ETraceOptType::All: {
210+
if (!SchemeQuery && ScriptQueries.empty()) {
211+
ythrow yexception() << "Trace opt type all cannot be used without any queries";
212+
}
213+
}
214+
case NKqpRun::TRunnerOptions::ETraceOptType::Disabled: {
215+
break;
216+
}
217+
}
218+
}
219+
109220
private:
110221
template <typename TValue>
111222
static TValue GetValue(size_t index, const std::vector<TValue>& values, TValue defaultValue) {
@@ -278,7 +389,6 @@ class TMain : public TMainClassArgs {
278389
TVector<TString> UdfsPaths;
279390
TString UdfsDirectory;
280391
bool ExcludeLinkedUdfs = false;
281-
ui64 ResultsRowsLimit = 1000;
282392
bool EmulateYt = false;
283393

284394
static TString LoadFile(const TString& file) {
@@ -415,8 +525,8 @@ class TMain : public TMainClassArgs {
415525
.StoreMappedResultT<TString>(&RunnerOptions.ResultOutput, &GetDefaultOutput);
416526
options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results")
417527
.RequiredArgument("uint")
418-
.DefaultValue(ResultsRowsLimit)
419-
.StoreResult(&ResultsRowsLimit);
528+
.DefaultValue(0)
529+
.StoreResult(&ExecutionOptions.ResultsRowsLimit);
420530
TChoices<NKqpRun::TRunnerOptions::EResultOutputFormat> resultFormat({
421531
{"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson},
422532
{"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson},
@@ -441,7 +551,12 @@ class TMain : public TMainClassArgs {
441551
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput);
442552
options.AddLongOption("script-statistics", "File with script inprogress statistics")
443553
.RequiredArgument("file")
444-
.StoreResult(&RunnerOptions.InProgressStatisticsOutputFile);
554+
.StoreMappedResultT<TString>(&RunnerOptions.InProgressStatisticsOutputFile, [](const TString& file) {
555+
if (file == "-") {
556+
ythrow yexception() << "Script in progress statistics cannot be printed to stdout, please specify file name";
557+
}
558+
return file;
559+
});
445560
TChoices<NYdb::NConsoleClient::EDataFormat> planFormat({
446561
{"pretty", NYdb::NConsoleClient::EDataFormat::Pretty},
447562
{"table", NYdb::NConsoleClient::EDataFormat::PrettyTable},
@@ -453,6 +568,15 @@ class TMain : public TMainClassArgs {
453568
.Choices(planFormat.GetChoices())
454569
.StoreMappedResultT<TString>(&RunnerOptions.PlanOutputFormat, planFormat);
455570

571+
options.AddLongOption("script-timeline-file", "File with script query timline in svg format")
572+
.RequiredArgument("file")
573+
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryTimelineFile, [](const TString& file) {
574+
if (file == "-") {
575+
ythrow yexception() << "Script timline cannot be printed to stdout, please specify file name";
576+
}
577+
return file;
578+
});
579+
456580
// Pipeline settings
457581

458582
TChoices<TExecutionOptions::EExecutionCase> executionCase({
@@ -463,7 +587,6 @@ class TMain : public TMainClassArgs {
463587
});
464588
options.AddLongOption('C', "execution-case", "Type of query for -p argument")
465589
.RequiredArgument("query-type")
466-
.DefaultValue("script")
467590
.Choices(executionCase.GetChoices())
468591
.Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) {
469592
TString choice(option->CurValOrDef());
@@ -489,13 +612,16 @@ class TMain : public TMainClassArgs {
489612
});
490613
options.AddLongOption('A', "script-action", "Script query execute action")
491614
.RequiredArgument("script-action")
492-
.DefaultValue("execute")
493615
.Choices(scriptAction.GetChoices())
494616
.Handler1([this, scriptAction](const NLastGetopt::TOptsParser* option) {
495617
TString choice(option->CurValOrDef());
496618
ExecutionOptions.ScriptQueryActions.emplace_back(scriptAction(choice));
497619
});
498620

621+
options.AddLongOption("timeout", "Reauests timeout in milliseconds")
622+
.RequiredArgument("uint")
623+
.StoreMappedResultT<ui64>(&RunnerOptions.YdbSettings.RequestsTimeout, &TDuration::MilliSeconds<ui64>);
624+
499625
options.AddLongOption("cancel-after", "Cancel script execution operation after specified delay in milliseconds")
500626
.RequiredArgument("uint")
501627
.StoreMappedResultT<ui64>(&RunnerOptions.ScriptCancelAfter, &TDuration::MilliSeconds<ui64>);
@@ -510,7 +636,7 @@ class TMain : public TMainClassArgs {
510636
.StoreResult(&ExecutionOptions.LoopCount);
511637
options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps")
512638
.RequiredArgument("uint")
513-
.DefaultValue(1000)
639+
.DefaultValue(0)
514640
.StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);
515641

516642
options.AddLongOption('D', "database", "Database path for -p queries")
@@ -576,6 +702,21 @@ class TMain : public TMainClassArgs {
576702
.RequiredArgument("path")
577703
.InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants);
578704

705+
options.AddLongOption("storage-size", "Domain storage size in gigabytes")
706+
.RequiredArgument("uint")
707+
.DefaultValue(32)
708+
.StoreMappedResultT<ui32>(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) {
709+
return static_cast<ui64>(diskSize) << 30;
710+
});
711+
712+
options.AddLongOption("real-pdisks", "Use real PDisks instead of in memory PDisks (also disable disk mock)")
713+
.NoArgument()
714+
.SetFlag(&RunnerOptions.YdbSettings.UseRealPDisks);
715+
716+
options.AddLongOption("disable-disk-mock", "Disable disk mock on single node cluster")
717+
.NoArgument()
718+
.SetFlag(&RunnerOptions.YdbSettings.DisableDiskMock);
719+
579720
TChoices<std::function<void()>> backtrace({
580721
{"heavy", &NKikimr::EnableYDBBacktraceFormat},
581722
{"light", []() { SetFormatBackTraceFn(FormatBackTrace); }}
@@ -591,13 +732,17 @@ class TMain : public TMainClassArgs {
591732
}
592733

593734
int DoRun(NLastGetopt::TOptsParseResult&&) override {
594-
if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled && !RunnerOptions.YdbSettings.GrpcEnabled) {
595-
ythrow yexception() << "Nothing to execute";
735+
ExecutionOptions.Validate(RunnerOptions);
736+
737+
if (RunnerOptions.YdbSettings.DisableDiskMock && RunnerOptions.YdbSettings.NodeCount + RunnerOptions.YdbSettings.SharedTenants.size() + RunnerOptions.YdbSettings.DedicatedTenants.size() > 1) {
738+
ythrow yexception() << "Disable disk mock cannot be used for multi node clusters";
596739
}
597740

598741
RunnerOptions.YdbSettings.YqlToken = YqlToken;
599742
RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get();
600-
RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit);
743+
if (ExecutionOptions.ResultsRowsLimit) {
744+
RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit);
745+
}
601746

602747
if (EmulateYt) {
603748
const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage();

ydb/tests/tools/kqprun/src/common.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ struct TYdbSetupSettings {
3232
std::unordered_set<TString> SharedTenants;
3333
std::unordered_set<TString> ServerlessTenants;
3434
TDuration InitializationTimeout = TDuration::Seconds(10);
35+
TDuration RequestsTimeout;
36+
37+
bool DisableDiskMock = false;
38+
bool UseRealPDisks = false;
39+
ui64 DiskSize = 32_GB;
3540

3641
bool MonitoringEnabled = false;
3742
ui16 MonitoringPortOffset = 0;
@@ -69,6 +74,7 @@ struct TRunnerOptions {
6974
IOutputStream* SchemeQueryAstOutput = nullptr;
7075
IOutputStream* ScriptQueryAstOutput = nullptr;
7176
IOutputStream* ScriptQueryPlanOutput = nullptr;
77+
TString ScriptQueryTimelineFile;
7278
TString InProgressStatisticsOutputFile;
7379

7480
EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;

0 commit comments

Comments
 (0)