@@ -17,8 +17,6 @@ using namespace NNodeWhiteboard;
17
17
class TJsonQuery : public TViewerPipeClient {
18
18
using TThis = TJsonQuery;
19
19
using TBase = TViewerPipeClient;
20
- TJsonSettings JsonSettings;
21
- ui32 Timeout = 60000 ;
22
20
std::vector<std::vector<Ydb::ResultSet>> ResultSets;
23
21
TString Query;
24
22
TString Action;
@@ -30,6 +28,11 @@ class TJsonQuery : public TViewerPipeClient {
30
28
bool IsBase64Encode = true ;
31
29
int LimitRows = 10000 ;
32
30
int TotalRows = 0 ;
31
+ bool CollectDiagnostics = true ;
32
+ TDuration StatsPeriod;
33
+ TDuration KeepAlive = TDuration::MilliSeconds(10000 );
34
+ TInstant LastSendTime;
35
+ static constexpr TDuration WakeupPeriod = TDuration::Seconds(1 );
33
36
34
37
enum ESchemaType {
35
38
Classic,
@@ -66,10 +69,8 @@ class TJsonQuery : public TViewerPipeClient {
66
69
}
67
70
}
68
71
69
- void ParseCgiParameters (const TCgiParameters& params) {
70
- JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool >(params.Get (" enums" ), false );
71
- JsonSettings.UI64AsString = !FromStringWithDefault<bool >(params.Get (" ui64" ), false );
72
- Timeout = FromStringWithDefault<ui32>(params.Get (" timeout" ), Timeout);
72
+ void InitConfig (const TCgiParameters& params) {
73
+ Timeout = TDuration::MilliSeconds (FromStringWithDefault<ui32>(params.Get (" timeout" ), 60000 )); // override default timeout to 60 seconds
73
74
if (params.Has (" query" )) {
74
75
Query = params.Get (" query" );
75
76
}
@@ -89,6 +90,9 @@ class TJsonQuery : public TViewerPipeClient {
89
90
if (params.Has (" concurrent_results" )) {
90
91
ConcurrentResults = FromStringWithDefault<bool >(params.Get (" concurrent_results" ), ConcurrentResults);
91
92
}
93
+ if (params.Has (" keep_alive" )) {
94
+ KeepAlive = TDuration::MilliSeconds (FromStringWithDefault<ui32>(params.Get (" keep_alive" ), KeepAlive.MilliSeconds ()));
95
+ }
92
96
}
93
97
}
94
98
if (params.Has (" syntax" )) {
@@ -112,18 +116,22 @@ class TJsonQuery : public TViewerPipeClient {
112
116
if (params.Has (" output_chunk_max_size" )) {
113
117
OutputChunkMaxSize = FromStringWithDefault<ui64>(params.Get (" output_chunk_max_size" ), OutputChunkMaxSize);
114
118
}
119
+ CollectDiagnostics = FromStringWithDefault<bool >(params.Get (" collect_diagnostics" ), CollectDiagnostics);
120
+ if (params.Has (" stats_period" )) {
121
+ StatsPeriod = TDuration::MilliSeconds (std::clamp<ui64>(FromStringWithDefault<ui64>(params.Get (" stats_period" ), StatsPeriod.MilliSeconds ()), 1000 , 600000 ));
122
+ }
115
123
}
116
124
117
125
TJsonQuery (IViewer* viewer, NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev)
118
126
: TBase(viewer, ev)
119
127
{
128
+ InitConfig (Params);
120
129
}
121
130
122
131
void Bootstrap () override {
123
132
if (NeedToRedirect ()) {
124
133
return ;
125
134
}
126
- ParseCgiParameters (Params);
127
135
if (Query.empty () && Action != " cancel-query" ) {
128
136
return TBase::ReplyAndPassAway (GetHTTPBADREQUEST (" text/plain" , " Query is empty" ), " EmptyQuery" );
129
137
}
@@ -147,7 +155,11 @@ class TJsonQuery : public TViewerPipeClient {
147
155
}
148
156
Send (HttpEvent->Sender , new NHttp::TEvHttpProxy::TEvSubscribeForCancel (), IEventHandle::FlagTrackDelivery);
149
157
SendKpqProxyRequest ();
150
- Become (&TThis::StateWork, TDuration::MilliSeconds (Timeout), new TEvents::TEvWakeup ());
158
+ Become (&TThis::StateWork);
159
+ if (Timeout || KeepAlive) {
160
+ Schedule (WakeupPeriod, new TEvents::TEvWakeup ());
161
+ }
162
+ LastSendTime = TActivationContext::Now ();
151
163
}
152
164
153
165
void CancelQuery () {
@@ -203,7 +215,7 @@ class TJsonQuery : public TViewerPipeClient {
203
215
hFunc (NKqp::TEvKqpExecuter::TEvStreamData, HandleReply);
204
216
cFunc (NHttp::TEvHttpProxy::EvRequestCancelled, Cancelled);
205
217
hFunc (TEvents::TEvUndelivered, Undelivered);
206
- cFunc (TEvents::TSystem::Wakeup, HandleTimeout );
218
+ cFunc (TEvents::TSystem::Wakeup, HandleWakeup );
207
219
}
208
220
}
209
221
@@ -374,6 +386,10 @@ class TJsonQuery : public TViewerPipeClient {
374
386
if (OutputChunkMaxSize) {
375
387
request.SetOutputChunkMaxSize (OutputChunkMaxSize);
376
388
}
389
+ request.SetCollectDiagnostics (CollectDiagnostics);
390
+ if (StatsPeriod) {
391
+ event->SetProgressStatsPeriod (StatsPeriod);
392
+ }
377
393
ActorIdToProto (SelfId (), event->Record .MutableRequestActorId ());
378
394
QueryResponse = MakeRequest<NKqp::TEvKqp::TEvQueryResponse>(NKqp::MakeKqpProxyID (SelfId ().NodeId ()), event.Release ());
379
395
@@ -656,8 +672,15 @@ class TJsonQuery : public TViewerPipeClient {
656
672
ReplyWithJsonAndPassAway (json);
657
673
}
658
674
659
- void HandleTimeout () {
660
- ReplyWithError (" Timeout executing query" );
675
+ void HandleWakeup () {
676
+ auto now = TActivationContext::Now ();
677
+ if (Timeout && (now - LastSendTime > Timeout)) {
678
+ return ReplyWithError (" Timeout executing query" );
679
+ }
680
+ if (KeepAlive && (now - LastSendTime > KeepAlive)) {
681
+ SendKeepAlive ();
682
+ }
683
+ Schedule (WakeupPeriod, new TEvents::TEvWakeup ());
661
684
}
662
685
663
686
private:
@@ -853,6 +876,7 @@ class TJsonQuery : public TViewerPipeClient {
853
876
data << " --boundary\r\n Content-Type: application/json\r\n Content-Length: " << content.Size () << " \r\n\r\n " << content.Str () << " \r\n " ;
854
877
auto dataChunk = HttpResponse->CreateDataChunk (data);
855
878
Send (HttpEvent->Sender , new NHttp::TEvHttpProxy::TEvHttpOutgoingDataChunk (dataChunk));
879
+ LastSendTime = TActivationContext::Now ();
856
880
}
857
881
858
882
void StreamJsonResponse (const NKikimrKqp::TEvExecuterStreamData& data) {
@@ -876,6 +900,19 @@ class TJsonQuery : public TViewerPipeClient {
876
900
auto dataChunk = HttpResponse->CreateDataChunk (" --boundary--\r\n " );
877
901
dataChunk->SetEndOfData ();
878
902
Send (HttpEvent->Sender , new NHttp::TEvHttpProxy::TEvHttpOutgoingDataChunk (dataChunk));
903
+ LastSendTime = TActivationContext::Now ();
904
+ }
905
+
906
+ void SendKeepAlive () {
907
+ if (Streaming) {
908
+ NJson::TJsonValue json;
909
+ NJson::TJsonValue& jsonMeta = json[" meta" ];
910
+ jsonMeta[" event" ] = " KeepAlive" ;
911
+ StreamJsonResponse (json);
912
+ }
913
+ if (SessionId) {
914
+ PingSession ();
915
+ }
879
916
}
880
917
881
918
void ReplyWithJsonAndPassAway (const NJson::TJsonValue& json, const TString& error = {}) {
@@ -1011,6 +1048,20 @@ class TJsonQuery : public TViewerPipeClient {
1011
1048
description: resource pool in which the query will be executed
1012
1049
type: string
1013
1050
required: false
1051
+ - name: keep_alive
1052
+ in: query
1053
+ description: time of inactivity to send keep-alive in stream (multipart) queries
1054
+ type: integer
1055
+ default: 10000
1056
+ - name: collect_diagnostics
1057
+ in: query
1058
+ description: collect query diagnostics
1059
+ type: boolean
1060
+ default: true
1061
+ - name: stats_period
1062
+ in: query
1063
+ description: time interval for sending periodical query statistics in ms
1064
+ type: integer
1014
1065
requestBody:
1015
1066
description: Executes SQL query
1016
1067
required: false
0 commit comments