Skip to content

Commit 95e571e

Browse files
authored
Timeline - visualization of query state (#7053)
1 parent c11bbbe commit 95e571e

File tree

11 files changed

+1709
-11
lines changed

11 files changed

+1709
-11
lines changed

ydb/core/fq/libs/compute/common/plan2svg.cpp

Lines changed: 1445 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
#pragma once
2+
3+
#include <vector>
4+
#include <map>
5+
6+
#include <library/cpp/json/json_reader.h>
7+
#include <library/cpp/json/json_writer.h>
8+
#include <library/cpp/json/yson/json2yson.h>
9+
10+
#include <util/generic/string.h>
11+
#include <util/string/builder.h>
12+
13+
class TStage;
14+
15+
class TSummaryMetric {
16+
17+
public:
18+
ui64 Value = 0;
19+
ui32 Count = 0;
20+
ui64 Min = 0;
21+
ui64 Max = 0;
22+
23+
void Add(ui64 value) {
24+
if (Count) {
25+
Min = std::min(Min, value);
26+
Max = std::max(Max, value);
27+
} else {
28+
Min = value;
29+
Max = value;
30+
}
31+
Value += value;
32+
Count++;
33+
}
34+
35+
ui64 Average() {
36+
return Count ? (Value / Count) : 0;
37+
}
38+
};
39+
40+
struct TAggregation {
41+
ui64 Min = 0;
42+
ui64 Max = 0;
43+
ui64 Avg = 0;
44+
ui64 Sum = 0;
45+
ui32 Count = 0;
46+
47+
bool Load(const NJson::TJsonValue& node);
48+
};
49+
50+
struct TMetricHistory {
51+
std::vector<std::pair<ui64, ui64>> Deriv;
52+
ui64 MaxDeriv = 0;
53+
std::vector<std::pair<ui64, ui64>> Values;
54+
ui64 MaxValue = 0;
55+
ui64 MinTime = 0;
56+
ui64 MaxTime = 0;
57+
58+
void Load(const NJson::TJsonValue& node, ui64 explicitMinTime, ui64 explicitMaxTime);
59+
};
60+
61+
class TSingleMetric {
62+
63+
public:
64+
TSingleMetric(std::shared_ptr<TSummaryMetric> summary, const NJson::TJsonValue& node,
65+
const NJson::TJsonValue* firstMessageNode = nullptr,
66+
const NJson::TJsonValue* lastMessageNode = nullptr,
67+
const NJson::TJsonValue* waitTimeUsNode = nullptr);
68+
69+
std::shared_ptr<TSummaryMetric> Summary;
70+
TAggregation Details;
71+
72+
TMetricHistory History;
73+
TMetricHistory WaitTime;
74+
ui64 MinTime = 0;
75+
ui64 MaxTime = 0;
76+
TAggregation FirstMessage;
77+
TAggregation LastMessage;
78+
};
79+
80+
class TConnection {
81+
82+
public:
83+
TConnection(const TString& nodeType) : NodeType(nodeType) {
84+
}
85+
86+
TString NodeType;
87+
std::shared_ptr<TStage> FromStage;
88+
std::shared_ptr<TSingleMetric> InputBytes;
89+
std::shared_ptr<TSingleMetric> InputRows;
90+
std::vector<std::string> KeyColumns;
91+
bool CteConnection = false;
92+
ui32 CteIndentX = 0;
93+
ui32 CteOffsetY = 0;
94+
const NJson::TJsonValue* StatsNode = nullptr;
95+
};
96+
97+
class TSource {
98+
99+
public:
100+
TSource(const TString& nodeType) : NodeType(nodeType) {
101+
}
102+
103+
TString NodeType;
104+
std::shared_ptr<TSingleMetric> IngressBytes;
105+
std::shared_ptr<TSingleMetric> IngressRows;
106+
std::vector<std::string> Info;
107+
};
108+
109+
class TStage {
110+
111+
public:
112+
TStage(const TString& nodeType) : NodeType(nodeType) {
113+
}
114+
115+
TString NodeType;
116+
std::shared_ptr<TSource> Source;
117+
std::shared_ptr<TSingleMetric> IngressBytes;
118+
std::vector<std::shared_ptr<TConnection>> Connections;
119+
ui32 IndentX = 0;
120+
ui32 IndentY = 0;
121+
ui32 OffsetY = 0;
122+
ui32 Height = 0;
123+
std::shared_ptr<TSingleMetric> CpuTime;
124+
std::shared_ptr<TSingleMetric> MaxMemoryUsage;
125+
std::shared_ptr<TSingleMetric> OutputBytes;
126+
std::shared_ptr<TSingleMetric> OutputRows;
127+
std::vector<std::string> Info;
128+
ui64 BaseTime = 0;
129+
ui32 PlanNodeId = 0;
130+
ui32 PhysicalStageId = 0;
131+
ui32 Tasks = 0;
132+
const NJson::TJsonValue* StatsNode = nullptr;
133+
};
134+
135+
struct TColorPalette {
136+
TColorPalette();
137+
TString StageDark;
138+
TString StageLight;
139+
TString StageText;
140+
TString StageGrid;
141+
TString IngressDark;
142+
TString IngressMedium;
143+
TString IngressLight;
144+
TString InputDark;
145+
TString InputMedium;
146+
TString InputLight;
147+
TString OutputDark;
148+
TString OutputMedium;
149+
TString OutputLight;
150+
TString MemMedium;
151+
TString MemLight;
152+
TString CpuMedium;
153+
TString CpuLight;
154+
TString ConnectionFill;
155+
TString ConnectionLine;
156+
TString ConnectionText;
157+
TString MinMaxLine;
158+
TString TextLight;
159+
};
160+
161+
struct TPlanViewConfig {
162+
TPlanViewConfig();
163+
ui32 HeaderWidth;
164+
ui32 SummaryWidth;
165+
ui32 Width;
166+
TColorPalette Palette;
167+
};
168+
169+
class TPlan {
170+
171+
public:
172+
TPlan(const TString& nodeType, TPlanViewConfig& config) : NodeType(nodeType), Config(config) {
173+
CpuTime = std::make_shared<TSummaryMetric>();
174+
MaxMemoryUsage = std::make_shared<TSummaryMetric>();
175+
OutputBytes = std::make_shared<TSummaryMetric>();
176+
OutputRows = std::make_shared<TSummaryMetric>();
177+
InputBytes = std::make_shared<TSummaryMetric>();
178+
InputRows = std::make_shared<TSummaryMetric>();
179+
IngressBytes = std::make_shared<TSummaryMetric>();
180+
IngressRows = std::make_shared<TSummaryMetric>();
181+
}
182+
183+
void Load(const NJson::TJsonValue& node);
184+
void LoadStage(std::shared_ptr<TStage> stage, const NJson::TJsonValue& node);
185+
void LoadSource(std::shared_ptr<TSource> source, const NJson::TJsonValue& node);
186+
void MarkStageIndent(ui32 indentX, ui32& offsetY, std::shared_ptr<TStage> stage);
187+
void MarkLayout();
188+
void PrintTimeline(TStringBuilder& background, TStringBuilder& canvas, const TString& title, TAggregation& firstMessage, TAggregation& lastMessage, ui32 x, ui32 y, ui32 w, ui32 h, const TString& color);
189+
void PrintWaitTime(TStringBuilder& canvas, std::shared_ptr<TSingleMetric> metric, ui32 x, ui32 y, ui32 w, ui32 h, const TString& fillColor);
190+
void PrintDeriv(TStringBuilder& canvas, std::shared_ptr<TSingleMetric> metric, ui32 x, ui32 y, ui32 w, ui32 h, const TString& title, const TString& lineColor, const TString& fillColor = "");
191+
void PrintValues(TStringBuilder& canvas, std::shared_ptr<TSingleMetric> metric, ui32 x, ui32 y, ui32 w, ui32 h, const TString& title, const TString& lineColor, const TString& fillColor = "");
192+
void PrintSvg(ui64 maxTime, ui32& offsetY, TStringBuilder& background, TStringBuilder& canvas);
193+
TString NodeType;
194+
std::vector<std::shared_ptr<TStage>> Stages;
195+
std::shared_ptr<TSummaryMetric> CpuTime;
196+
std::shared_ptr<TSummaryMetric> MaxMemoryUsage;
197+
std::shared_ptr<TSummaryMetric> OutputBytes;
198+
std::shared_ptr<TSummaryMetric> OutputRows;
199+
std::shared_ptr<TSummaryMetric> InputBytes;
200+
std::shared_ptr<TSummaryMetric> InputRows;
201+
std::shared_ptr<TSummaryMetric> IngressBytes;
202+
std::shared_ptr<TSummaryMetric> IngressRows;
203+
ui64 MaxTime = 1000;
204+
ui64 BaseTime = 0;
205+
ui64 TimeOffset = 0;
206+
ui32 OffsetY = 0;
207+
ui32 Tasks = 0;
208+
std::vector<std::pair<std::string, std::shared_ptr<TConnection>>> CteRefs;
209+
std::map<std::string, std::shared_ptr<TStage>> CteStages;
210+
TPlanViewConfig& Config;
211+
};
212+
213+
class TPlanVisualizer {
214+
215+
public:
216+
217+
void LoadPlans(const TString& plans);
218+
void LoadPlan(const TString& planNodeType, const NJson::TJsonValue& root);
219+
void SetTimeOffsets();
220+
TString PrintSvg();
221+
TString PrintSvgSafe();
222+
223+
std::vector<TPlan> Plans;
224+
ui64 MaxTime = 1000;
225+
ui64 BaseTime = 0;
226+
TPlanViewConfig Config;
227+
};

ydb/core/fq/libs/compute/common/utils.cpp

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "utils.h"
2+
#include "plan2svg.h"
23

34
#include <library/cpp/json/json_reader.h>
45
#include <library/cpp/json/json_writer.h>
@@ -432,18 +433,22 @@ void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value, ui32&
432433
}
433434
}
434435

435-
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) {
436+
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage, TString* timeline) {
436437
TStringStream out;
437438
NYson::TYsonWriter writer(&out);
438439
writer.OnBeginMap();
439440
NJson::TJsonReaderConfig jsonConfig;
440441
NJson::TJsonValue stat;
442+
TPlanVisualizer planViz;
441443
if (NJson::ReadJsonTree(plan, &jsonConfig, &stat)) {
442444
if (auto* topNode = stat.GetValueByPath("Plan")) {
443445
if (auto* subNode = topNode->GetValueByPath("Plans")) {
444446
for (auto plan : subNode->GetArray()) {
445447
if (auto* typeNode = plan.GetValueByPath("Node Type")) {
446448
auto nodeType = typeNode->GetStringSafe();
449+
if (timeline) {
450+
planViz.LoadPlan(nodeType, plan);
451+
}
447452
TTotalStatistics totals;
448453
ui32 stageViewIndex = 0;
449454
writer.OnKeyedItem(nodeType);
@@ -473,6 +478,13 @@ TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) {
473478
}
474479
}
475480
}
481+
if (timeline) {
482+
planViz.SetTimeOffsets();
483+
*timeline = planViz.PrintSvgSafe();
484+
// remove json "timeline" field after migration
485+
writer.OnKeyedItem("timeline");
486+
writer.OnStringScalar(*timeline);
487+
}
476488
writer.OnEndMap();
477489
return NJson2Yson::ConvertYson2Json(out.Str());
478490
}
@@ -1147,7 +1159,7 @@ struct TNoneStatProcessor : IPlanStatProcessor {
11471159
return plan;
11481160
}
11491161

1150-
TString GetQueryStat(const TString&, double& cpuUsage) override {
1162+
TString GetQueryStat(const TString&, double& cpuUsage, TString*) override {
11511163
cpuUsage = 0.0;
11521164
return "";
11531165
}
@@ -1180,8 +1192,8 @@ struct TPlanStatProcessor : IPlanStatProcessor {
11801192
return plan;
11811193
}
11821194

1183-
TString GetQueryStat(const TString& plan, double& cpuUsage) override {
1184-
return GetV1StatFromV2Plan(plan, &cpuUsage);
1195+
TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline) override {
1196+
return GetV1StatFromV2Plan(plan, &cpuUsage, timeline);
11851197
}
11861198

11871199
TPublicStat GetPublicStat(const TString& stat) override {
@@ -1212,8 +1224,8 @@ struct TProfileStatProcessor : TPlanStatProcessor {
12121224
};
12131225

12141226
struct TProdStatProcessor : TFullStatProcessor {
1215-
TString GetQueryStat(const TString& plan, double& cpuUsage) override {
1216-
return GetPrettyStatistics(GetV1StatFromV2Plan(plan, &cpuUsage));
1227+
TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline) override {
1228+
return GetPrettyStatistics(GetV1StatFromV2Plan(plan, &cpuUsage, timeline));
12171229
}
12181230
};
12191231

@@ -1231,7 +1243,7 @@ std::unique_ptr<IPlanStatProcessor> CreateStatProcessor(const TString& statViewN
12311243

12321244
PingTaskRequestBuilder::PingTaskRequestBuilder(const NConfig::TCommonConfig& commonConfig, std::unique_ptr<IPlanStatProcessor>&& processor)
12331245
: Compressor(commonConfig.GetQueryArtifactsCompressionMethod(), commonConfig.GetQueryArtifactsCompressionMinSize())
1234-
, Processor(std::move(processor))
1246+
, Processor(std::move(processor)), ShowQueryTimeline(commonConfig.GetShowQueryTimeline())
12351247
{}
12361248

12371249
Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(
@@ -1297,9 +1309,13 @@ Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(const TString& queryP
12971309

12981310
CpuUsage = 0.0;
12991311
try {
1300-
auto stat = Processor->GetQueryStat(plan, CpuUsage);
1312+
TString timeline;
1313+
auto stat = Processor->GetQueryStat(plan, CpuUsage, ShowQueryTimeline ? &timeline : nullptr);
13011314
pingTaskRequest.set_statistics(stat);
13021315
pingTaskRequest.set_dump_raw_statistics(true);
1316+
if (timeline) {
1317+
pingTaskRequest.set_timeline(timeline);
1318+
}
13031319
auto flatStat = Processor->GetFlatStat(plan);
13041320
flatStat["CompilationTimeUs"] = compilationTimeUs;
13051321
flatStat["ComputeTimeUs"] = computeTimeUs;

ydb/core/fq/libs/compute/common/utils.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ inline std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient(const TS
3030
tableSettings);
3131
}
3232

33-
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr);
33+
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr, TString* timeline = nullptr);
3434
TString GetV1StatFromV2PlanV2(const TString& plan);
3535
TString GetPrettyStatistics(const TString& statistics);
3636
THashMap<TString, i64> AggregateStats(TStringBuf plan);
@@ -57,7 +57,7 @@ struct IPlanStatProcessor {
5757
virtual NYdb::NQuery::EStatsMode GetStatsMode() = 0;
5858
virtual TString ConvertPlan(const TString& plan) = 0;
5959
virtual TString GetPlanVisualization(const TString& plan) = 0;
60-
virtual TString GetQueryStat(const TString& plan, double& cpuUsage) = 0;
60+
virtual TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline) = 0;
6161
virtual TPublicStat GetPublicStat(const TString& stat) = 0;
6262
virtual THashMap<TString, i64> GetFlatStat(TStringBuf plan) = 0;
6363
};
@@ -81,6 +81,7 @@ class PingTaskRequestBuilder {
8181
private:
8282
const TCompressor Compressor;
8383
std::unique_ptr<IPlanStatProcessor> Processor;
84+
bool ShowQueryTimeline = false;
8485
};
8586

8687
TString GetStatViewName(const ::NFq::TRunActorParams& params);

ydb/core/fq/libs/compute/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
pinger.cpp
5+
plan2svg.cpp
56
run_actor_params.cpp
67
utils.cpp
78
)

ydb/core/fq/libs/config/protos/common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ message TCommonConfig {
2929
bool KeepInternalErrors = 13;
3030
bool UseNativeProtocolForClickHouse = 14;
3131
bool DisableSslForGenericDataSources = 15;
32+
bool ShowQueryTimeline = 16;
3233
}

ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,10 @@ TPingTaskParams ConstructHardPingTask(
285285
internal.set_current_load(request.current_load());
286286
}
287287

288+
if (request.timeline()) {
289+
internal.set_timeline(request.timeline());
290+
}
291+
288292
if (request.flat_stats_size() != 0) {
289293
internal.clear_statistics();
290294
auto stats = DeserializeFlatStats(request.flat_stats());

ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ message QueryInternal {
5757
NYql.NDqProto.StatusIds.StatusCode pending_status_code = 28;
5858
repeated StatisticsNamedValue statistics = 29;
5959
int32 current_load = 30;
60+
string timeline = 31;
6061
}
6162

6263
message JobInternal {

ydb/core/fq/libs/control_plane_storage/util.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ bool DoesPingTaskUpdateQueriesTable(const Fq::Private::PingTaskRequest& request)
189189
|| !request.issues().empty()
190190
|| !request.transient_issues().empty()
191191
|| request.statistics()
192+
|| request.timeline()
192193
|| !request.result_set_meta().empty()
193194
|| request.ast()
194195
|| request.ast_compressed().data()

0 commit comments

Comments
 (0)