Skip to content

Commit d230fe0

Browse files
authored
fix: http handler kill not working. (#15558)
* fix: http handler kill not working. * ci: add logic test for http kill.
1 parent 6e4afed commit d230fe0

File tree

5 files changed

+77
-42
lines changed

5 files changed

+77
-42
lines changed

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -234,13 +234,16 @@ async fn query_final_handler(
234234
query_id, query_id
235235
);
236236
let http_query_manager = HttpQueryManager::instance();
237-
match http_query_manager.remove_query(&query_id, RemoveReason::Finished) {
237+
match http_query_manager
238+
.remove_query(
239+
&query_id,
240+
RemoveReason::Finished,
241+
ErrorCode::ClosedQuery("closed by client"),
242+
)
243+
.await
244+
{
238245
Some(query) => {
239246
let mut response = query.get_response_state_only().await;
240-
if query.check_removed().is_none() && !response.state.state.is_stopped() {
241-
query.kill(ErrorCode::ClosedQuery("closed by client")).await;
242-
response = query.get_response_state_only().await;
243-
}
244247
// it is safe to set these 2 fields to None, because client now check for null/None first.
245248
response.session = None;
246249
response.state.affect = None;
@@ -268,15 +271,15 @@ async fn query_cancel_handler(
268271
query_id, query_id
269272
);
270273
let http_query_manager = HttpQueryManager::instance();
271-
match http_query_manager.remove_query(&query_id, RemoveReason::Canceled) {
272-
Some(query) => {
273-
if query.check_removed().is_none() {
274-
query
275-
.kill(ErrorCode::AbortedQuery("canceled by client"))
276-
.await;
277-
}
278-
Ok(StatusCode::OK)
279-
}
274+
match http_query_manager
275+
.remove_query(
276+
&query_id,
277+
RemoveReason::Canceled,
278+
ErrorCode::AbortedQuery("canceled by client"),
279+
)
280+
.await
281+
{
282+
Some(_) => Ok(StatusCode::OK),
280283
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
281284
}
282285
}

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,13 @@ impl HttpQuery {
338338
"last query on the session not finished",
339339
));
340340
}
341-
let _ = http_query_manager.remove_query(&query_id, RemoveReason::Canceled);
341+
let _ = http_query_manager
342+
.remove_query(
343+
&query_id,
344+
RemoveReason::Canceled,
345+
ErrorCode::ClosedQuery("closed by next query"),
346+
)
347+
.await;
342348
}
343349
// wait for Arc<QueryContextShared> to drop and detach itself from session
344350
// should not take too long

src/query/service/src/servers/http/v1/query/http_query_manager.rs

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use databend_common_config::InnerConfig;
2929
use databend_common_exception::ErrorCode;
3030
use databend_common_exception::Result;
3131
use databend_storages_common_txn::TxnManagerRef;
32-
use log::warn;
3332
use parking_lot::Mutex;
3433
use tokio::task;
3534

@@ -150,19 +149,13 @@ impl HttpQueryManager {
150149
"http query {} timeout after {} s",
151150
&query_id_clone, query_result_timeout_secs
152151
);
153-
match self_clone.remove_query(&query_id_clone, RemoveReason::Timeout) {
154-
Some(_) => {
155-
warn!("{msg}");
156-
if let Some(query) = http_query_weak.upgrade() {
157-
if query.check_removed().is_none() {
158-
query.kill(ErrorCode::AbortedQuery(&msg)).await;
159-
}
160-
}
161-
}
162-
None => {
163-
warn!("{msg}, but already evict, too many queries?");
164-
}
165-
};
152+
_ = self_clone
153+
.remove_query(
154+
&query_id_clone,
155+
RemoveReason::Timeout,
156+
ErrorCode::AbortedQuery(&msg),
157+
)
158+
.await;
166159
break;
167160
}
168161
ExpireResult::Sleep(t) => {
@@ -177,26 +170,24 @@ impl HttpQueryManager {
177170
}
178171

179172
#[async_backtrace::framed]
180-
pub(crate) fn remove_query(
173+
pub(crate) async fn remove_query(
181174
self: &Arc<Self>,
182175
query_id: &str,
183176
reason: RemoveReason,
177+
error: ErrorCode,
184178
) -> Option<Arc<HttpQuery>> {
185179
// deref at once to avoid holding DashMap shard guard for too long.
186180
let query = self.queries.get(query_id).map(|q| q.clone());
187-
query.map(|q| {
188-
let not_removed_yet = !q.mark_removed(reason);
189-
let to_evict = not_removed_yet
190-
.then(|| {
191-
let mut queue = self.removed_queries.lock();
192-
queue.push(q.id.to_string())
193-
})
194-
.flatten();
195-
if let Some(qid) = to_evict {
196-
self.queries.remove(&qid);
181+
if let Some(q) = &query {
182+
if q.mark_removed(reason) {
183+
q.kill(error).await;
184+
let mut queue = self.removed_queries.lock();
185+
if let Some(to_evict) = queue.push(q.id.to_string()) {
186+
self.queries.remove(&to_evict);
187+
};
197188
}
198-
q.clone()
199-
})
189+
}
190+
query
200191
}
201192

202193
#[async_backtrace::framed]
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
## query
2+
"Running"
3+
## kill
4+
200
5+
## page
6+
{"error":{"code":"400","message":"query id QID canceled"}}
7+
400
8+
## final
9+
{
10+
"code": 1043,
11+
"message": "canceled by client",
12+
"detail": ""
13+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
. "$CURDIR"/../../../shell_env.sh
5+
6+
QID="my_query_for_kill_${RANDOM}"
7+
echo "## query"
8+
curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}" -H 'Content-Type: application/json' -d '{"sql": "select sleep(0.5) from numbers(15000000);", "pagination": { "wait_time_secs": 6}}' | jq ".state"
9+
echo "## kill"
10+
curl -s -u root: -XGET -w "%{http_code}\n" "http://localhost:8000/v1/query/${QID}/kill"
11+
echo "## page"
12+
curl -s -u root: -XGET -w "\n%{http_code}\n" "http://localhost:8000/v1/query/${QID}/page/0" | sed "s/${QID}/QID/g"
13+
echo "## final"
14+
curl -s -u root: -XGET -w "\n" "http://localhost:8000/v1/query/${QID}/final" | jq ".error"
15+
16+
## todo: this is flaky on ci, may lost the second row, can not reproduce locally for now
17+
#echo "## query_log"
18+
#echo "select exception_code, exception_text, log_type from system.query_log where query_id='${QID}' order by log_type" | $BENDSQL_CLIENT_CONNECT | sed "s/${QID}/QID/g"
19+
#----
20+
### query_log
21+
#0 1
22+
#1043 AbortedQuery. Code: 1043, Text = canceled by client. 4

0 commit comments

Comments
 (0)