Skip to content

Commit fec4dab

Browse files
authored
refactor: truncate support build pipeline (#15027)
* refactor codes * add truncate pipeline * fix test * fix review comment
1 parent 03a6257 commit fec4dab

File tree

30 files changed

+706
-521
lines changed

30 files changed

+706
-521
lines changed

scripts/benchmark/query/load/tpch10.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ select version();
77
SQL
88

99
for t in customer lineitem nation orders partsupp part region supplier; do
10-
echo "DROP TABLE IF EXISTS $t;" | bendsql
10+
echo "DROP TABLE IF EXISTS $t;" | bendsql
1111
done
1212

1313
cat <<SQL | bendsql
@@ -113,8 +113,8 @@ cat <<SQL | bendsql
113113
SQL
114114

115115
for t in customer lineitem nation orders partsupp part region supplier; do
116-
echo "loading into $t ..."
117-
cat <<SQL | bendsql
116+
echo "loading into $t ..."
117+
cat <<SQL | bendsql
118118
COPY INTO $t FROM 's3://repo.databend.rs/datasets/tpch10/${t}/' connection=(connection_name='repo') pattern ='${t}.*'
119119
file_format=(type='CSV' field_delimiter='|' record_delimiter='\\n' skip_header=0);
120120
ANALYZE TABLE "${t}";

scripts/benchmark/query/load/tpch100.sh

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ select version();
77
SQL
88

99
for t in customer lineitem nation orders partsupp part region supplier; do
10-
echo "DROP TABLE IF EXISTS $t;" | bendsql
10+
echo "DROP TABLE IF EXISTS $t;" | bendsql
1111
done
1212

1313
cat <<SQL | bendsql
@@ -113,8 +113,8 @@ cat <<SQL | bendsql
113113
SQL
114114

115115
for t in nation region; do
116-
echo "loading into $t ..."
117-
cat <<SQL | bendsql
116+
echo "loading into $t ..."
117+
cat <<SQL | bendsql
118118
COPY INTO $t FROM 's3://repo.databend.rs/tpch100/${t}.tbl'
119119
credentials=(access_key_id ='$REPO_ACCESS_KEY_ID' secret_access_key ='$REPO_SECRET_ACCESS_KEY')
120120
file_format=(type='CSV' field_delimiter='|' record_delimiter='\\n' skip_header=1);
@@ -124,8 +124,8 @@ SQL
124124
done
125125

126126
for t in customer lineitem orders partsupp part supplier; do
127-
echo "loading into $t ..."
128-
cat <<SQL | bendsql
127+
echo "loading into $t ..."
128+
cat <<SQL | bendsql
129129
COPY INTO $t FROM 's3://repo.databend.rs/tpch100/${t}/' connection=(connection_name='repo') pattern ='${t}.tbl.*'
130130
file_format=(type='CSV' field_delimiter='|' record_delimiter='\\n' skip_header=1);
131131
ANALYZE TABLE "${t}";

src/query/catalog/src/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ pub trait Table: Sync + Send {
255255
}
256256

257257
#[async_backtrace::framed]
258-
async fn truncate(&self, ctx: Arc<dyn TableContext>) -> Result<()> {
259-
let _ = ctx;
258+
async fn truncate(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline) -> Result<()> {
259+
let (_, _) = (ctx, pipeline);
260260
Ok(())
261261
}
262262

src/query/service/src/interpreters/interpreter_delete.rs

Lines changed: 81 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::collections::HashSet;
1616
use std::collections::VecDeque;
1717
use std::sync::Arc;
1818

19+
use databend_common_base::base::ProgressValues;
1920
use databend_common_catalog::plan::Filters;
2021
use databend_common_catalog::plan::Partitions;
2122
use databend_common_catalog::table::TableExt;
@@ -52,6 +53,7 @@ use databend_common_sql::MetadataRef;
5253
use databend_common_sql::ScalarExpr;
5354
use databend_common_sql::Visibility;
5455
use databend_common_storages_factory::Table;
56+
use databend_common_storages_fuse::operations::TruncateMode;
5557
use databend_common_storages_fuse::FuseTable;
5658
use databend_storages_common_table_meta::meta::TableSnapshot;
5759
use futures_util::TryStreamExt;
@@ -188,7 +190,7 @@ impl Interpreter for DeleteInterpreter {
188190
(None, vec![])
189191
};
190192

191-
let fuse_table = tbl.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
193+
let fuse_table = FuseTable::try_from_table(tbl.as_ref()).map_err(|_| {
192194
ErrorCode::Unimplemented(format!(
193195
"table {}, engine type {}, does not support DELETE FROM",
194196
tbl.name(),
@@ -197,48 +199,91 @@ impl Interpreter for DeleteInterpreter {
197199
})?;
198200

199201
let mut build_res = PipelineBuildResult::create();
200-
let query_row_id_col = !self.plan.subquery_desc.is_empty();
201-
if let Some(snapshot) = fuse_table
202-
.fast_delete(
203-
self.ctx.clone(),
204-
filters.clone(),
205-
col_indices.clone(),
206-
query_row_id_col,
207-
)
208-
.await?
209-
{
210-
let cluster = self.ctx.get_cluster();
211-
let is_lazy = !cluster.is_empty() && snapshot.segments.len() >= cluster.nodes.len();
212-
let partitions = fuse_table
213-
.mutation_read_partitions(
202+
203+
// check if table is empty
204+
let Some(snapshot) = fuse_table.read_table_snapshot().await? else {
205+
// no snapshot, no deletion
206+
return Ok(build_res);
207+
};
208+
if snapshot.summary.row_count == 0 {
209+
// empty snapshot, no deletion
210+
return Ok(build_res);
211+
}
212+
213+
build_res.main_pipeline.add_lock_guard(lock_guard);
214+
// check if unconditional deletion
215+
let Some(filters) = filters else {
216+
let progress_values = ProgressValues {
217+
rows: snapshot.summary.row_count as usize,
218+
bytes: snapshot.summary.uncompressed_byte_size as usize,
219+
};
220+
self.ctx.get_write_progress().incr(&progress_values);
221+
// deleting the whole table... just a truncate
222+
fuse_table
223+
.do_truncate(
214224
self.ctx.clone(),
215-
snapshot.clone(),
216-
col_indices.clone(),
217-
filters.clone(),
218-
is_lazy,
219-
true,
225+
&mut build_res.main_pipeline,
226+
TruncateMode::Delete,
220227
)
221228
.await?;
229+
return Ok(build_res);
230+
};
222231

223-
// Safe to unwrap, because if filters is None, fast_delete will do truncate and return None.
224-
let filters = filters.unwrap();
225-
let physical_plan = Self::build_physical_plan(
226-
filters,
227-
partitions,
228-
fuse_table.get_table_info().clone(),
229-
col_indices,
230-
snapshot,
231-
catalog_info,
232-
is_distributed,
233-
query_row_id_col,
234-
)?;
235-
236-
build_res =
237-
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
232+
let query_row_id_col = !self.plan.subquery_desc.is_empty();
233+
if col_indices.is_empty() && !query_row_id_col {
234+
// here the situation: filter_expr is not null, but col_indices in empty, which
235+
// indicates the expr being evaluated is unrelated to the value of rows:
236+
// e.g.
237+
// `delete from t where 1 = 1`, `delete from t where now()`,
238+
// or `delete from t where RANDOM()::INT::BOOLEAN`
239+
// if the `filter_expr` is of "constant" nullary :
240+
// for the whole block, whether all of the rows should be kept or dropped,
241+
// we can just return from here, without accessing the block data
242+
if fuse_table.try_eval_const(self.ctx.clone(), &fuse_table.schema(), &filters.filter)? {
243+
let progress_values = ProgressValues {
244+
rows: snapshot.summary.row_count as usize,
245+
bytes: snapshot.summary.uncompressed_byte_size as usize,
246+
};
247+
self.ctx.get_write_progress().incr(&progress_values);
248+
249+
// deleting the whole table... just a truncate
250+
fuse_table
251+
.do_truncate(
252+
self.ctx.clone(),
253+
&mut build_res.main_pipeline,
254+
TruncateMode::Delete,
255+
)
256+
.await?;
257+
return Ok(build_res);
258+
}
238259
}
239260

240-
build_res.main_pipeline.add_lock_guard(lock_guard);
261+
let cluster = self.ctx.get_cluster();
262+
let is_lazy = !cluster.is_empty() && snapshot.segments.len() >= cluster.nodes.len();
263+
let partitions = fuse_table
264+
.mutation_read_partitions(
265+
self.ctx.clone(),
266+
snapshot.clone(),
267+
col_indices.clone(),
268+
Some(filters.clone()),
269+
is_lazy,
270+
true,
271+
)
272+
.await?;
273+
274+
let physical_plan = Self::build_physical_plan(
275+
filters,
276+
partitions,
277+
fuse_table.get_table_info().clone(),
278+
col_indices,
279+
snapshot,
280+
catalog_info,
281+
is_distributed,
282+
query_row_id_col,
283+
)?;
241284

285+
build_res =
286+
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
242287
Ok(build_res)
243288
}
244289
}

src/query/service/src/interpreters/interpreter_table_drop.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_management::RoleApi;
2121
use databend_common_meta_app::principal::OwnershipObject;
2222
use databend_common_meta_app::schema::DropTableByIdReq;
2323
use databend_common_sql::plans::DropTablePlan;
24+
use databend_common_storages_fuse::operations::TruncateMode;
2425
use databend_common_storages_fuse::FuseTable;
2526
use databend_common_storages_share::save_share_spec;
2627
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
@@ -126,6 +127,7 @@ impl Interpreter for DropTableInterpreter {
126127
role_api.revoke_ownership(&owner_object).await?;
127128
RoleCacheManager::instance().invalidate_cache(&tenant);
128129

130+
let mut build_res = PipelineBuildResult::create();
129131
// if `plan.all`, truncate, then purge the historical data
130132
if self.plan.all {
131133
// the above `catalog.drop_table` operation changed the table meta version,
@@ -135,10 +137,17 @@ impl Interpreter for DropTableInterpreter {
135137
// if target table if of type FuseTable, purge its historical data
136138
// otherwise, plain truncate
137139
if let Ok(fuse_table) = maybe_fuse_table {
138-
let purge = true;
139-
fuse_table.do_truncate(self.ctx.clone(), purge).await?
140+
fuse_table
141+
.do_truncate(
142+
self.ctx.clone(),
143+
&mut build_res.main_pipeline,
144+
TruncateMode::Purge,
145+
)
146+
.await?
140147
} else {
141-
latest.truncate(self.ctx.clone()).await?
148+
latest
149+
.truncate(self.ctx.clone(), &mut build_res.main_pipeline)
150+
.await?
142151
}
143152
}
144153

@@ -153,6 +162,6 @@ impl Interpreter for DropTableInterpreter {
153162
.await?;
154163
}
155164

156-
Ok(PipelineBuildResult::create())
165+
Ok(build_res)
157166
}
158167
}

src/query/service/src/interpreters/interpreter_table_truncate.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ use databend_common_catalog::table::TableExt;
1818
use databend_common_config::GlobalConfig;
1919
use databend_common_exception::Result;
2020
use databend_common_sql::plans::TruncateTablePlan;
21+
use databend_common_storages_fuse::FuseTable;
2122

2223
use crate::api::Packet;
2324
use crate::api::TruncateTablePacket;
2425
use crate::interpreters::Interpreter;
26+
use crate::locks::LockManager;
2527
use crate::pipelines::PipelineBuildResult;
2628
use crate::sessions::QueryContext;
2729
use crate::sessions::TableContext;
@@ -78,6 +80,15 @@ impl Interpreter for TruncateTableInterpreter {
7880
// check mutability
7981
table.check_mutable()?;
8082

83+
// Add table lock.
84+
let maybe_fuse_table = FuseTable::try_from_table(table.as_ref()).is_ok();
85+
let lock_guard = if maybe_fuse_table {
86+
let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?;
87+
table_lock.try_lock(self.ctx.clone()).await?
88+
} else {
89+
None
90+
};
91+
8192
if self.proxy_to_cluster && table.broadcast_truncate_to_cluster() {
8293
let settings = self.ctx.get_settings();
8394
let timeout = settings.get_flight_client_timeout()?;
@@ -96,7 +107,11 @@ impl Interpreter for TruncateTableInterpreter {
96107
}
97108
}
98109

99-
table.truncate(self.ctx.clone()).await?;
100-
Ok(PipelineBuildResult::create())
110+
let mut build_res = PipelineBuildResult::create();
111+
build_res.main_pipeline.add_lock_guard(lock_guard);
112+
table
113+
.truncate(self.ctx.clone(), &mut build_res.main_pipeline)
114+
.await?;
115+
Ok(build_res)
101116
}
102117
}

src/query/service/tests/it/storages/fuse/operations/analyze.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::sync::Arc;
1616

1717
use databend_common_base::base::tokio;
1818
use databend_common_exception::Result;
19-
use databend_common_storages_factory::Table;
2019
use databend_common_storages_fuse::FuseTable;
2120
use databend_common_storages_fuse::TableContext;
2221
use databend_query::test_kits::*;
@@ -71,15 +70,9 @@ async fn test_fuse_snapshot_analyze_and_truncate() -> Result<()> {
7170

7271
// truncate table
7372
{
74-
let ctx = fixture.new_query_ctx().await?;
75-
let catalog = ctx
76-
.get_catalog(fixture.default_catalog_name().as_str())
77-
.await?;
78-
let table = catalog
79-
.get_table(ctx.get_tenant().as_str(), &db, &tbl)
80-
.await?;
81-
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
82-
fuse_table.truncate(ctx).await?;
73+
let qry = format!("Truncate table {}.{}", db, tbl);
74+
let r = fixture.execute_command(&qry).await;
75+
assert!(r.is_ok());
8376
}
8477

8578
// optimize after truncate table, ts file location will become None

0 commit comments

Comments
 (0)