Skip to content

Commit a66bbe9

Browse files
committed
TMP
1 parent c68fa4c commit a66bbe9

File tree

17 files changed

+257
-128
lines changed

17 files changed

+257
-128
lines changed

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
122122
size_t size = 0;
123123
ui64 maxBatchRowSize = 100;
124124

125-
while (size_t read = fi.ReadLine(rawMsg)) {
125+
while (fi.ReadLine(rawMsg)) {
126126
msgs.emplace_back(MakeNextMessage(rawMsg));
127127
MsgOffset_++;
128128
if (!maxBatchRowSize--) {
@@ -132,6 +132,8 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
132132
}
133133
if (!msgs.empty()) {
134134
EventsQ_.Push(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, Session_), size);
135+
} else {
136+
EventsQ_.Push(NYdb::NTopic::TSessionClosedEvent(NYdb::EStatus::CANCELLED, {NYdb::NIssue::TIssue("PQ file topic was finished")}), size);
135137
}
136138

137139
Sleep(FILE_POLL_PERIOD);

ydb/tests/tools/fqrun/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@ sync_dir
55
*.conf
66
*.parquet
77
*.json
8+
*.svg
9+
*.txt

ydb/tests/tools/fqrun/README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@
22

33
Tool can be used to execute streaming queries by using FQ proxy infrastructure.
44

5+
For profiling memory allocations build fqrun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS`.
6+
7+
## Scripts
8+
9+
* `flame_graph.sh` - script for collecting flame graphs in svg format, usage:
10+
```(bash)
11+
./flame_graph.sh [graph collection time in seconds]
12+
```
13+
514
## Examples
615
716
### Queries
@@ -25,7 +34,7 @@ Tool can be used to execute streaming queries by using FQ proxy infrastructure.
2534
./fqrun -M 32000
2635
```
2736
28-
Monitoring endpoint: https://localhost:32000
37+
Monitoring endpoint: http://localhost:32000
2938
3039
* gRPC endpoint:
3140
```(bash)
Lines changed: 45 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,46 @@
1-
ActorSystemConfig {
2-
Executor {
3-
Type: BASIC
4-
Threads: 1
5-
SpinThreshold: 10
6-
Name: "System"
7-
}
8-
Executor {
9-
Type: BASIC
10-
Threads: 6
11-
SpinThreshold: 1
12-
Name: "User"
13-
}
14-
Executor {
15-
Type: BASIC
16-
Threads: 1
17-
SpinThreshold: 1
18-
Name: "Batch"
19-
}
20-
Executor {
21-
Type: IO
22-
Threads: 1
23-
Name: "IO"
24-
}
25-
Executor {
26-
Type: BASIC
27-
Threads: 2
28-
SpinThreshold: 10
29-
Name: "IC"
30-
TimePerMailboxMicroSecs: 100
31-
}
32-
Scheduler {
33-
Resolution: 64
34-
SpinThreshold: 0
35-
ProgressThreshold: 10000
36-
}
37-
SysExecutor: 0
38-
UserExecutor: 1
39-
IoExecutor: 3
40-
BatchExecutor: 2
41-
ServiceExecutor {
42-
ServiceName: "Interconnect"
43-
ExecutorId: 4
44-
}
1+
Executor {
2+
Type: BASIC
3+
Threads: 1
4+
SpinThreshold: 10
5+
Name: "System"
6+
}
7+
Executor {
8+
Type: BASIC
9+
Threads: 6
10+
SpinThreshold: 1
11+
Name: "User"
12+
}
13+
Executor {
14+
Type: BASIC
15+
Threads: 1
16+
SpinThreshold: 1
17+
Name: "Batch"
18+
}
19+
Executor {
20+
Type: IO
21+
Threads: 1
22+
Name: "IO"
23+
}
24+
Executor {
25+
Type: BASIC
26+
Threads: 2
27+
SpinThreshold: 10
28+
Name: "IC"
29+
TimePerMailboxMicroSecs: 100
30+
}
31+
32+
Scheduler {
33+
Resolution: 64
34+
SpinThreshold: 0
35+
ProgressThreshold: 10000
36+
}
37+
38+
SysExecutor: 0
39+
UserExecutor: 1
40+
IoExecutor: 3
41+
BatchExecutor: 2
42+
43+
ServiceExecutor {
44+
ServiceName: "Interconnect"
45+
ExecutorId: 4
4546
}

ydb/tests/tools/fqrun/flame_graph.sh

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#!/usr/bin/env bash
2+
3+
set -eux
4+
5+
function cleanup {
6+
rm ./profdata
7+
rm ./profdata.txt
8+
}
9+
trap cleanup EXIT
10+
11+
if [ $# -gt 1 ]; then
12+
echo "Too many arguments"
13+
exit -1
14+
fi
15+
16+
fqrun_pid=$(pgrep -u $USER fqrun)
17+
18+
sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $fqrun_pid -v -o profdata -- sleep ${1:-'30'}
19+
sudo perf script -i profdata > profdata.txt
20+
21+
SCRIPT_DIR=$(cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd)
22+
23+
flame_graph_tool="$SCRIPT_DIR/../../../../contrib/tools/flame-graph/"
24+
25+
${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg

ydb/tests/tools/fqrun/fqrun.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <ydb/tests/tools/kqprun/runlib/application.h>
99
#include <ydb/tests/tools/kqprun/runlib/utils.h>
1010

11+
#ifdef PROFILE_MEMORY_ALLOCATIONS
12+
#include <library/cpp/lfalloc/alloc_profiler/profiler.h>
13+
#endif
14+
1115
using namespace NKikimrRun;
1216

1317
namespace NFqRun {
@@ -156,6 +160,21 @@ class TMain : public TMainBase {
156160
}
157161
});
158162

163+
options.AddLongOption("as-cfg", "File with actor system config (TActorSystemConfig), use '-' for default")
164+
.RequiredArgument("file")
165+
.DefaultValue("./configuration/as_config.conf")
166+
.Handler1([this](const NLastGetopt::TOptsParser* option) {
167+
const TString file(option->CurValOrDef());
168+
if (file == "-") {
169+
return;
170+
}
171+
172+
RunnerOptions.FqSettings.ActorSystemConfig = NKikimrConfig::TActorSystemConfig();
173+
if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &(*RunnerOptions.FqSettings.ActorSystemConfig))) {
174+
ythrow yexception() << "Bad format of actor system configuration";
175+
}
176+
});
177+
159178
options.AddLongOption("emulate-s3", "Enable readings by s3 provider from files, `bucket` value in connection - path to folder with files")
160179
.NoArgument()
161180
.SetFlag(&RunnerOptions.FqSettings.EmulateS3);
@@ -208,6 +227,7 @@ class TMain : public TMainBase {
208227
ExecutionOptions.Validate(RunnerOptions);
209228

210229
RunnerOptions.FqSettings.YqlToken = GetEnv(YQL_TOKEN_VARIABLE);
230+
RunnerOptions.FqSettings.FunctionRegistry = CreateFunctionRegistry().Get();
211231

212232
auto& gatewayConfig = *RunnerOptions.FqSettings.FqConfig.mutable_gateways();
213233
FillTokens(gatewayConfig.mutable_pq());
@@ -228,8 +248,26 @@ class TMain : public TMainBase {
228248
RunnerOptions.FqSettings.PqGateway = std::move(fileGateway);
229249
}
230250

251+
#ifdef PROFILE_MEMORY_ALLOCATIONS
252+
if (RunnerOptions.FqSettings.VerboseLevel >= 1) {
253+
Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl;
254+
}
255+
NAllocProfiler::StartAllocationSampling(true);
256+
#else
257+
if (ProfileAllocationsOutput) {
258+
ythrow yexception() << "Profile memory allocations disabled, please rebuild fqrun with flag `-D PROFILE_MEMORY_ALLOCATIONS`";
259+
}
260+
#endif
261+
231262
RunScript(ExecutionOptions, RunnerOptions);
232263

264+
#ifdef PROFILE_MEMORY_ALLOCATIONS
265+
if (RunnerOptions.FqSettings.VerboseLevel >= 1) {
266+
Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl;
267+
}
268+
FinishProfileMemoryAllocations();
269+
#endif
270+
233271
return 0;
234272
}
235273

@@ -244,6 +282,8 @@ class TMain : public TMainBase {
244282
}
245283

246284
private:
285+
inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout);
286+
247287
TExecutionOptions ExecutionOptions;
248288
TRunnerOptions RunnerOptions;
249289
std::unordered_map<TString, TString> PqFilesMapping;

ydb/tests/tools/fqrun/src/common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
88
#include <ydb/tests/tools/kqprun/runlib/settings.h>
99

10+
#include <yql/essentials/minikql/mkql_function_registry.h>
11+
1012
namespace NFqRun {
1113

1214
constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";
@@ -27,8 +29,10 @@ struct TFqSetupSettings : public NKikimrRun::TServerSettings {
2729

2830
TString YqlToken;
2931
NYql::IPqGateway::TPtr PqGateway;
32+
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
3033
NFq::NConfig::TConfig FqConfig;
3134
NKikimrConfig::TLogConfig LogConfig;
35+
std::optional<NKikimrConfig::TActorSystemConfig> ActorSystemConfig;
3236
};
3337

3438
struct TRunnerOptions {

ydb/tests/tools/fqrun/src/fq_setup.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ class TFqSetup::TImpl {
4444
serverSettings.SetLoggerInitializer(loggerInitializer);
4545
}
4646

47+
void SetFunctionRegistry(NKikimr::Tests::TServerSettings& serverSettings) const {
48+
if (Settings.FunctionRegistry) {
49+
serverSettings.SetFrFactory([this](const NKikimr::NScheme::TTypeRegistry&) {
50+
return Settings.FunctionRegistry.Get();
51+
});
52+
}
53+
}
54+
4755
NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) {
4856
NKikimr::Tests::TServerSettings serverSettings(PortManager.GetPort());
4957

@@ -52,9 +60,13 @@ class TFqSetup::TImpl {
5260

5361
NKikimrConfig::TAppConfig config;
5462
*config.MutableLogConfig() = Settings.LogConfig;
63+
if (Settings.ActorSystemConfig) {
64+
*config.MutableActorSystemConfig() = *Settings.ActorSystemConfig;
65+
}
5566
serverSettings.SetAppConfig(config);
5667

5768
SetLoggerSettings(serverSettings);
69+
SetFunctionRegistry(serverSettings);
5870

5971
if (Settings.MonitoringEnabled) {
6072
serverSettings.InitKikimrRunConfig();

ydb/tests/tools/fqrun/src/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ PEERDIR(
2020
ydb/library/security
2121
ydb/library/yql/providers/pq/provider
2222
ydb/tests/tools/kqprun/runlib
23+
yql/essentials/minikql
2324
)
2425

2526
YQL_LAST_ABI_VERSION()

ydb/tests/tools/fqrun/ya.make

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
PROGRAM(fqrun)
22

3+
IF (PROFILE_MEMORY_ALLOCATIONS)
4+
MESSAGE("Enabled profile memory allocations")
5+
ALLOCATOR(LF_DBG)
6+
CFLAGS(-D PROFILE_MEMORY_ALLOCATIONS)
7+
ENDIF()
8+
39
SRCS(
410
fqrun.cpp
511
)
612

713
PEERDIR(
814
library/cpp/colorizer
915
library/cpp/getopt
16+
library/cpp/lfalloc/alloc_profiler
1017
util
1118
ydb/library/yql/providers/pq/gateway/dummy
1219
ydb/tests/tools/fqrun/src

ydb/tests/tools/kqprun/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMO
4242
./kqprun -M 32000
4343
```
4444
45-
Monitoring endpoint: https://localhost:32000
45+
Monitoring endpoint: http://localhost:32000
4646
4747
* gRPC endpoint:
4848
```(bash)

0 commit comments

Comments
 (0)