Skip to content

Commit dc63755

Browse files
authored
Merge pull request #9091 from youngsofun/http
refactor(rest api): drop ctx after query finished.
2 parents 74962ba + dfe82ad commit dc63755

File tree

3 files changed

+53
-54
lines changed

3 files changed

+53
-54
lines changed

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ pub struct ExecuteRunning {
116116
}
117117

118118
pub struct ExecuteStopped {
119-
pub ctx: Arc<QueryContext>,
120119
pub stats: Progresses,
121120
pub affect: Option<QueryAffect>,
122121
pub reason: Result<()>,
@@ -183,7 +182,6 @@ impl Executor {
183182
.unwrap_or_else(|e| error!("fail to write query_log {:?}", e));
184183
}
185184
guard.state = Stopped(Box::new(ExecuteStopped {
186-
ctx: s.ctx.clone(),
187185
stats: Default::default(),
188186
reason,
189187
stop_time: Instant::now(),
@@ -203,7 +201,6 @@ impl Executor {
203201
}
204202

205203
guard.state = Stopped(Box::new(ExecuteStopped {
206-
ctx: r.ctx.clone(),
207204
stats: Progresses::from_context(&r.ctx),
208205
reason,
209206
stop_time: Instant::now(),
@@ -227,7 +224,7 @@ impl ExecuteState {
227224
session: Arc<Session>,
228225
ctx: Arc<QueryContext>,
229226
block_sender: SizedChannelSender<DataBlock>,
230-
) -> Result<ExecuteRunning> {
227+
) -> Result<()> {
231228
let mut planner = Planner::new(ctx.clone());
232229
let (plan, _, _) = planner.plan_sql(sql).await?;
233230
ctx.attach_query_str(plan.to_string(), sql);
@@ -238,30 +235,31 @@ impl ExecuteState {
238235
ctx: ctx.clone(),
239236
};
240237

238+
info!("http query {}, change state to Running", &ctx.get_id());
239+
Executor::start_to_running(&executor, Running(running_state)).await;
240+
241241
let executor_clone = executor.clone();
242242
let ctx_clone = ctx.clone();
243243
let block_sender_closer = block_sender.closer();
244244

245-
ctx.try_spawn(async move {
246-
let res = execute(interpreter, ctx_clone, block_sender, executor_clone.clone());
247-
match AssertUnwindSafe(res).catch_unwind().await {
248-
Ok(Err(err)) => {
249-
Executor::stop(&executor_clone, Err(err), false).await;
250-
block_sender_closer.close();
251-
}
252-
Err(_) => {
253-
Executor::stop(
254-
&executor_clone,
255-
Err(ErrorCode::PanicError("interpreter panic!")),
256-
false,
257-
)
258-
.await;
259-
block_sender_closer.close();
260-
}
261-
_ => {}
245+
let res = execute(interpreter, ctx_clone, block_sender, executor_clone.clone());
246+
match AssertUnwindSafe(res).catch_unwind().await {
247+
Ok(Err(err)) => {
248+
Executor::stop(&executor_clone, Err(err), false).await;
249+
block_sender_closer.close();
262250
}
263-
})?;
264-
Ok(running_state)
251+
Err(_) => {
252+
Executor::stop(
253+
&executor_clone,
254+
Err(ErrorCode::PanicError("interpreter panic!")),
255+
false,
256+
)
257+
.await;
258+
block_sender_closer.close();
259+
}
260+
_ => {}
261+
}
262+
Ok(())
265263
}
266264
}
267265

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -239,42 +239,30 @@ impl HttpQuery {
239239
let block_sender_closer = block_sender.closer();
240240
let state_clone = state.clone();
241241
let ctx_clone = ctx.clone();
242+
let ctx_clone2 = ctx.clone();
242243
let sql = request.sql.clone();
243244
let query_id = id.clone();
244245
let query_id_clone = id.clone();
245246
ctx.try_spawn(async move {
246247
let state = state_clone.clone();
247-
let running_state = ExecuteState::try_start_query(
248-
state,
249-
&sql,
250-
session,
251-
ctx_clone.clone(),
252-
block_sender,
253-
)
254-
.await;
255-
match running_state {
256-
Ok(s) => {
257-
tracing::info!("http query {}, change state to Running", &query_id);
258-
Executor::start_to_running(&state_clone, ExecuteState::Running(s)).await;
259-
}
260-
Err(e) => {
261-
InterpreterQueryLog::fail_to_start(ctx_clone.clone(), e.clone());
262-
let state = ExecuteStopped {
263-
ctx: ctx_clone.clone(),
264-
stats: Progresses::default(),
265-
reason: Err(e.clone()),
266-
stop_time: Instant::now(),
267-
affect: ctx_clone.get_affect(),
268-
};
269-
tracing::info!(
270-
"http query {}, change state to Stopped, fail to start {:?}",
271-
&query_id,
272-
e
273-
);
274-
Executor::start_to_stop(&state_clone, ExecuteState::Stopped(Box::new(state)))
275-
.await;
276-
block_sender_closer.close();
277-
}
248+
if let Err(e) =
249+
ExecuteState::try_start_query(state, &sql, session, ctx_clone.clone(), block_sender)
250+
.await
251+
{
252+
InterpreterQueryLog::fail_to_start(ctx_clone.clone(), e.clone());
253+
let state = ExecuteStopped {
254+
stats: Progresses::default(),
255+
reason: Err(e.clone()),
256+
stop_time: Instant::now(),
257+
affect: ctx_clone.get_affect(),
258+
};
259+
tracing::info!(
260+
"http query {}, change state to Stopped, fail to start {:?}",
261+
&query_id,
262+
e
263+
);
264+
Executor::start_to_stop(&state_clone, ExecuteState::Stopped(Box::new(state))).await;
265+
block_sender_closer.close();
278266
}
279267
})?;
280268

@@ -284,6 +272,7 @@ impl HttpQuery {
284272
request.pagination.max_rows_per_page,
285273
block_receiver,
286274
format_settings,
275+
ctx_clone2,
287276
)));
288277
let query = HttpQuery {
289278
id,

src/query/service/src/servers/http/v1/query/page_manager.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use std::sync::Arc;
1717
use std::time::Instant;
1818

1919
use common_base::base::tokio;
20+
use common_base::base::GlobalIORuntime;
21+
use common_base::base::TrySpawn;
2022
use common_datablocks::DataBlock;
2123
use common_datavalues::DataSchema;
2224
use common_datavalues::DataSchemaRef;
@@ -29,6 +31,7 @@ use tracing::info;
2931
use crate::servers::http::v1::json_block::block_to_json_value;
3032
use crate::servers::http::v1::query::sized_spsc::SizedChannelReceiver;
3133
use crate::servers::http::v1::JsonBlock;
34+
use crate::sessions::QueryContext;
3235

3336
#[derive(Debug, PartialEq, Eq)]
3437
pub enum Wait {
@@ -59,6 +62,7 @@ pub struct PageManager {
5962
row_buffer: VecDeque<Vec<JsonValue>>,
6063
block_receiver: SizedChannelReceiver<DataBlock>,
6164
format_settings: FormatSettings,
65+
query_ctx_ref: Option<Arc<QueryContext>>,
6266
}
6367

6468
impl PageManager {
@@ -67,6 +71,7 @@ impl PageManager {
6771
max_rows_per_page: usize,
6872
block_receiver: SizedChannelReceiver<DataBlock>,
6973
format_settings: FormatSettings,
74+
query_ctx_ref: Arc<QueryContext>,
7075
) -> PageManager {
7176
PageManager {
7277
query_id,
@@ -80,6 +85,7 @@ impl PageManager {
8085
block_receiver,
8186
max_rows_per_page,
8287
format_settings,
88+
query_ctx_ref: Some(query_ctx_ref),
8389
}
8490
}
8591

@@ -194,6 +200,12 @@ impl PageManager {
194200
if !self.block_end {
195201
self.block_end = self.block_receiver.is_empty();
196202
}
203+
if self.block_end {
204+
let ctx = self.query_ctx_ref.take();
205+
GlobalIORuntime::instance().spawn(async move {
206+
drop(ctx);
207+
});
208+
}
197209
let end = self.block_end && self.row_buffer.is_empty();
198210
Ok((block, end))
199211
}

0 commit comments

Comments
 (0)