@@ -144,7 +144,7 @@ impl QueryResponse {
144
144
( JsonBlock :: empty ( ) , None )
145
145
} else {
146
146
match state. state {
147
- ExecuteStateKind :: Running => match r. data {
147
+ ExecuteStateKind :: Running | ExecuteStateKind :: Starting => match r. data {
148
148
None => ( JsonBlock :: empty ( ) , Some ( make_state_uri ( & id) ) ) ,
149
149
Some ( d) => {
150
150
let uri = match d. next_page_no {
@@ -179,9 +179,14 @@ impl QueryResponse {
179
179
} ;
180
180
let rows = data. data . len ( ) ;
181
181
182
+ let state_kind = match state. state {
183
+ ExecuteStateKind :: Starting => ExecuteStateKind :: Running ,
184
+ _ => state. state ,
185
+ } ;
186
+
182
187
Json ( QueryResponse {
183
188
data : data. into ( ) ,
184
- state : state . state ,
189
+ state : state_kind ,
185
190
schema : state. schema . clone ( ) ,
186
191
session_id : Some ( session_id) ,
187
192
node_id : r. node_id ,
@@ -368,22 +373,30 @@ pub(crate) async fn query_handler(
368
373
match query {
369
374
Ok ( query) => {
370
375
query. update_expire_time ( true ) . await ;
371
- let resp = query
372
- . get_response_page ( 0 )
373
- . await
374
- . map_err ( |err| err. display_with_sql ( & sql) )
375
- . map_err ( |err| poem:: Error :: from_string ( err. message ( ) , StatusCode :: NOT_FOUND ) ) ?;
376
- if matches ! ( resp. state. state, ExecuteStateKind :: Failed ) {
376
+ // tmp workaround to tolerant old clients
377
+ let max_wait_time = std:: cmp:: max ( 10 , req. pagination . wait_time_secs ) ;
378
+ let start = std:: time:: Instant :: now ( ) ;
379
+ let resp = loop {
380
+ let resp = query
381
+ . get_response_page ( 0 )
382
+ . await
383
+ . map_err ( |err| err. display_with_sql ( & sql) )
384
+ . map_err ( |err| poem:: Error :: from_string ( err. message ( ) , StatusCode :: NOT_FOUND ) ) ?;
385
+ if matches ! ( resp. state. state, ExecuteStateKind :: Starting ) && start. elapsed ( ) . as_secs ( ) < max_wait_time as u64 {
386
+ continue ;
387
+ }
388
+ break resp
389
+ } ;
390
+ if matches ! ( resp. state. state, ExecuteStateKind :: Failed ) {
377
391
ctx. set_fail ( ) ;
378
392
}
379
393
let ( rows, next_page) = match & resp. data {
380
394
None => ( 0 , None ) ,
381
395
Some ( p) => ( p. page . data . num_rows ( ) , p. next_page_no ) ,
382
396
} ;
383
- info ! (
384
- "http query initial response to http query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'" ,
385
- & query. id, & resp. state, rows, next_page, mask_connection_info( & sql)
386
- ) ;
397
+ info ! ( "http query initial response to http query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'" ,
398
+ & query. id, & resp. state, rows, next_page, mask_connection_info( & sql)
399
+ ) ;
387
400
query. update_expire_time ( false ) . await ;
388
401
Ok ( QueryResponse :: from_internal ( query. id . to_string ( ) , resp, false ) . into_response ( ) )
389
402
}
0 commit comments