Skip to content

Commit a77511c

Browse files
authored
chore: support aborting table navigation (#15549)
* chore: support aborting table navigation * refact: introduce AbortChecker * add unit test * cleanup
1 parent 50a7f9b commit a77511c

File tree

14 files changed

+160
-36
lines changed

14 files changed

+160
-36
lines changed

src/query/catalog/src/table.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use chrono::DateTime;
2020
use chrono::Utc;
2121
use databend_common_exception::ErrorCode;
2222
use databend_common_exception::Result;
23+
use databend_common_expression::AbortChecker;
2324
use databend_common_expression::BlockThresholds;
2425
use databend_common_expression::ColumnId;
2526
use databend_common_expression::RemoteExpr;
@@ -294,8 +295,13 @@ pub trait Table: Sync + Send {
294295
}
295296

296297
#[async_backtrace::framed]
297-
async fn navigate_to(&self, navigation: &TimeNavigation) -> Result<Arc<dyn Table>> {
298+
async fn navigate_to(
299+
&self,
300+
navigation: &TimeNavigation,
301+
abort_checker: AbortChecker,
302+
) -> Result<Arc<dyn Table>> {
298303
let _ = navigation;
304+
let _ = abort_checker;
299305

300306
Err(ErrorCode::Unimplemented(format!(
301307
"Time travel operation is not supported for the table '{}', which uses the '{}' engine.",

src/query/catalog/src/table_context.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use databend_common_base::base::ProgressValues;
2727
use databend_common_base::runtime::profile::Profile;
2828
use databend_common_exception::ErrorCode;
2929
use databend_common_exception::Result;
30+
use databend_common_expression::AbortChecker;
3031
use databend_common_expression::BlockThresholds;
32+
use databend_common_expression::CheckAbort;
3133
use databend_common_expression::DataBlock;
3234
use databend_common_expression::Expr;
3335
use databend_common_expression::FunctionContext;
@@ -175,6 +177,22 @@ pub trait TableContext: Send + Sync {
175177
fn get_id(&self) -> String;
176178
fn get_current_catalog(&self) -> String;
177179
fn check_aborting(&self) -> Result<()>;
180+
fn get_abort_checker(self: Arc<Self>) -> AbortChecker
181+
where Self: 'static {
182+
struct Checker<S> {
183+
this: S,
184+
}
185+
impl<S: TableContext + ?Sized> CheckAbort for Checker<Arc<S>> {
186+
fn is_aborting(&self) -> bool {
187+
self.this.as_ref().check_aborting().is_err()
188+
}
189+
190+
fn try_check_aborting(&self) -> Result<()> {
191+
self.this.check_aborting()
192+
}
193+
}
194+
Arc::new(Checker { this: self })
195+
}
178196
fn get_error(&self) -> Option<ErrorCode>;
179197
fn push_warning(&self, warning: String);
180198
fn get_current_database(&self) -> String;

src/query/ee/src/stream/handler.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,14 @@ impl StreamHandler for RealStreamHandler {
9696
}
9797

9898
let table = FuseTable::try_from_table(table.as_ref())?;
99+
let abort_checker = ctx.get_abort_checker();
99100
let change_desc = table
100-
.get_change_descriptor(plan.append_only, "".to_string(), plan.navigation.as_ref())
101+
.get_change_descriptor(
102+
plan.append_only,
103+
"".to_string(),
104+
plan.navigation.as_ref(),
105+
abort_checker,
106+
)
101107
.await?;
102108
table.check_changes_valid(&plan.table_database, &plan.table_name, change_desc.seq)?;
103109

src/query/expression/src/kernels/sort.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ use crate::utils::arrow::column_to_arrow_array;
3737
use crate::Column;
3838
use crate::DataBlock;
3939

40-
pub type Aborting = Arc<Box<dyn Fn() -> bool + Send + Sync + 'static>>;
40+
pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;
41+
42+
pub trait CheckAbort {
43+
fn is_aborting(&self) -> bool;
44+
fn try_check_aborting(&self) -> Result<()>;
45+
}
4146

4247
#[derive(Clone)]
4348
pub struct SortColumnDescription {
@@ -162,13 +167,13 @@ impl DataBlock {
162167
blocks: &[DataBlock],
163168
descriptions: &[SortColumnDescription],
164169
limit: Option<usize>,
165-
aborting: Aborting,
170+
abort_checker: AbortChecker,
166171
) -> Result<DataBlock> {
167172
match blocks.len() {
168173
0 => Result::Err(ErrorCode::EmptyData("Can't merge empty blocks")),
169174
1 => Ok(blocks[0].clone()),
170175
2 => {
171-
if aborting() {
176+
if abort_checker.is_aborting() {
172177
return Err(ErrorCode::AbortedQuery(
173178
"Aborted query, because the server is shutting down or the query was killed.",
174179
));
@@ -177,7 +182,7 @@ impl DataBlock {
177182
DataBlock::two_way_merge_sort(blocks, descriptions, limit)
178183
}
179184
_ => {
180-
if aborting() {
185+
if abort_checker.is_aborting() {
181186
return Err(ErrorCode::AbortedQuery(
182187
"Aborted query, because the server is shutting down or the query was killed.",
183188
));
@@ -186,9 +191,9 @@ impl DataBlock {
186191
&blocks[0..blocks.len() / 2],
187192
descriptions,
188193
limit,
189-
aborting.clone(),
194+
abort_checker.clone(),
190195
)?;
191-
if aborting() {
196+
if abort_checker.is_aborting() {
192197
return Err(ErrorCode::AbortedQuery(
193198
"Aborted query, because the server is shutting down or the query was killed.",
194199
));
@@ -197,9 +202,9 @@ impl DataBlock {
197202
&blocks[blocks.len() / 2..blocks.len()],
198203
descriptions,
199204
limit,
200-
aborting.clone(),
205+
abort_checker.clone(),
201206
)?;
202-
if aborting() {
207+
if abort_checker.is_aborting() {
203208
return Err(ErrorCode::AbortedQuery(
204209
"Aborted query, because the server is shutting down or the query was killed.",
205210
));

src/query/expression/tests/it/sort.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use databend_common_exception::Result;
1919
use databend_common_expression::types::decimal::*;
2020
use databend_common_expression::types::number::*;
2121
use databend_common_expression::types::StringType;
22+
use databend_common_expression::AbortChecker;
23+
use databend_common_expression::CheckAbort;
2224
use databend_common_expression::Column;
2325
use databend_common_expression::DataBlock;
2426
use databend_common_expression::FromData;
@@ -298,7 +300,18 @@ fn test_blocks_merge_sort() -> Result<()> {
298300
),
299301
];
300302

301-
let aborting: Arc<Box<dyn Fn() -> bool + Send + Sync + 'static>> = Arc::new(Box::new(|| false));
303+
struct NeverAbort;
304+
impl CheckAbort for NeverAbort {
305+
fn is_aborting(&self) -> bool {
306+
false
307+
}
308+
fn try_check_aborting(&self) -> Result<()> {
309+
Ok(())
310+
}
311+
}
312+
313+
let aborting: AbortChecker = Arc::new(NeverAbort);
314+
302315
for (name, sort_descs, limit, expected) in test_cases {
303316
let res = DataBlock::merge_sort(&blocks, &sort_descs, limit, aborting.clone())?;
304317

src/query/service/src/interpreters/interpreter_table_analyze.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,12 @@ impl Interpreter for AnalyzeTableInterpreter {
8989

9090
let temporal_str = if let Some(table_statistics) = &table_statistics {
9191
let is_full = table
92-
.navigate_to_point(&NavigationPoint::SnapshotID(
93-
table_statistics.snapshot_id.simple().to_string(),
94-
))
92+
.navigate_to_point(
93+
&NavigationPoint::SnapshotID(
94+
table_statistics.snapshot_id.simple().to_string(),
95+
),
96+
self.ctx.clone().get_abort_checker(),
97+
)
9598
.await
9699
.is_err();
97100

src/query/service/tests/it/storages/fuse/operations/gc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ async fn test_fuse_purge_older_version() -> Result<()> {
267267
let time_point = now - Duration::hours(12);
268268
let snapshot_loc = fuse_table.snapshot_loc().await?.unwrap();
269269
let table = fuse_table
270-
.navigate_to_time_point(snapshot_loc, time_point)
270+
.navigate_to_time_point(snapshot_loc, time_point, ctx.clone().get_abort_checker())
271271
.await?;
272272
let keep_last_snapshot = true;
273273
table

src/query/service/tests/it/storages/fuse/operations/navigate.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::ops::Sub;
1616
use std::time::Duration;
1717

1818
use databend_common_base::base::tokio;
19+
use databend_common_catalog::table_context::TableContext;
1920
use databend_common_exception::ErrorCode;
2021
use databend_common_exception::Result;
2122
use databend_common_expression::DataBlock;
@@ -102,9 +103,11 @@ async fn test_fuse_navigate() -> Result<()> {
102103
.timestamp
103104
.unwrap()
104105
.sub(chrono::Duration::milliseconds(1));
106+
107+
let ctx = fixture.new_query_ctx().await?;
105108
// navigate from the instant that is just one ms before the timestamp of the latest snapshot
106109
let tbl = fuse_table
107-
.navigate_to_time_point(loc.clone(), instant)
110+
.navigate_to_time_point(loc.clone(), instant, ctx.clone().get_abort_checker())
108111
.await?;
109112

110113
// check we got the snapshot of the first insertion
@@ -117,12 +120,28 @@ async fn test_fuse_navigate() -> Result<()> {
117120
.unwrap()
118121
.sub(chrono::Duration::milliseconds(1));
119122
// navigate from the instant that is just one ms before the timestamp of the last insertion
120-
let res = fuse_table.navigate_to_time_point(loc, instant).await;
123+
let res = fuse_table
124+
.navigate_to_time_point(loc.clone(), instant, ctx.clone().get_abort_checker())
125+
.await;
121126
match res {
122127
Ok(_) => panic!("historical data should not exist"),
123128
Err(e) => assert_eq!(e.code(), ErrorCode::TABLE_HISTORICAL_DATA_NOT_FOUND),
124129
};
125130

131+
// navigation should abort if query killed
132+
ctx.get_current_session()
133+
.force_kill_query(ErrorCode::AbortedQuery("mission aborted"));
134+
let checker = ctx.clone().get_abort_checker();
135+
assert!(checker.is_aborting());
136+
let res = fuse_table
137+
.navigate_to_time_point(loc, instant, ctx.get_abort_checker())
138+
.await;
139+
140+
assert!(res.is_err());
141+
if let Err(e) = res {
142+
assert_eq!(e.code(), ErrorCode::ABORTED_QUERY);
143+
}
144+
126145
Ok(())
127146
}
128147

src/query/sql/src/planner/binder/table.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use databend_common_expression::is_stream_column;
5858
use databend_common_expression::type_check::check_number;
5959
use databend_common_expression::types::DataType;
6060
use databend_common_expression::types::NumberScalar;
61+
use databend_common_expression::AbortChecker;
6162
use databend_common_expression::ConstantFolder;
6263
use databend_common_expression::DataField;
6364
use databend_common_expression::FunctionContext;
@@ -220,6 +221,7 @@ impl Binder {
220221
database.as_str(),
221222
table_name.as_str(),
222223
navigation.as_ref(),
224+
self.ctx.clone().get_abort_checker(),
223225
)
224226
.await
225227
{
@@ -1342,6 +1344,7 @@ impl Binder {
13421344
database_name: &str,
13431345
table_name: &str,
13441346
navigation: Option<&TimeNavigation>,
1347+
abort_checker: AbortChecker,
13451348
) -> Result<Arc<dyn Table>> {
13461349
// Resolve table with ctx
13471350
// for example: select * from t1 join (select * from t1 as t2 where a > 1 and a < 13);
@@ -1353,7 +1356,7 @@ impl Binder {
13531356
.await?;
13541357

13551358
if let Some(desc) = navigation {
1356-
table_meta = table_meta.navigate_to(desc).await?;
1359+
table_meta = table_meta.navigate_to(desc, abort_checker).await?;
13571360
}
13581361
Ok(table_meta)
13591362
}

src/query/sql/src/planner/dataframe.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,14 @@ impl Dataframe {
8585
let database = "system";
8686
let tenant = query_ctx.get_tenant();
8787
let table_meta: Arc<dyn Table> = binder
88-
.resolve_data_source(tenant.tenant_name(), catalog, database, "one", None)
88+
.resolve_data_source(
89+
tenant.tenant_name(),
90+
catalog,
91+
database,
92+
"one",
93+
None,
94+
query_ctx.clone().get_abort_checker(),
95+
)
8996
.await?;
9097

9198
let table_index = metadata.write().add_table(

0 commit comments

Comments
 (0)