Skip to content

Commit 8a982b0

Browse files
authored
Merge pull request #8824 from zhyass/feature_cluster
refactor(storage): deletion
2 parents 1fb23a4 + 3ce1dbb commit 8a982b0

37 files changed

+1663
-640
lines changed

src/query/catalog/src/table.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use crate::plan::DataSourcePlan;
3434
use crate::plan::Expression;
3535
use crate::plan::PartStatistics;
3636
use crate::plan::Partitions;
37-
use crate::plan::Projection;
3837
use crate::plan::PushDownInfo;
3938
use crate::table::column_stats_provider_impls::DummyColumnStatisticsProvider;
4039
use crate::table_context::TableContext;
@@ -227,10 +226,11 @@ pub trait Table: Sync + Send {
227226
async fn delete(
228227
&self,
229228
ctx: Arc<dyn TableContext>,
230-
projection: &Projection,
231-
selection: &Option<String>,
229+
filter: Option<Expression>,
230+
col_indices: Vec<usize>,
231+
pipeline: &mut Pipeline,
232232
) -> Result<()> {
233-
let (_, _, _) = (ctx, projection, selection);
233+
let (_, _, _, _) = (ctx, filter, col_indices, pipeline);
234234

235235
Err(ErrorCode::Unimplemented(format!(
236236
"table {}, of engine type {}, does not support DELETE FROM",

src/query/service/src/interpreters/interpreter_delete.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@ use std::sync::Arc;
1616

1717
use common_datavalues::DataSchemaRef;
1818
use common_exception::Result;
19+
use common_pipeline_core::Pipeline;
20+
use common_sql::executor::ExpressionBuilderWithoutRenaming;
1921
use common_sql::plans::DeletePlan;
2022

2123
use crate::interpreters::Interpreter;
24+
use crate::pipelines::executor::ExecutorSettings;
25+
use crate::pipelines::executor::PipelineCompleteExecutor;
2226
use crate::pipelines::PipelineBuildResult;
2327
use crate::sessions::QueryContext;
2428
use crate::sessions::TableContext;
29+
use crate::sql::plans::ScalarExpr;
2530

2631
/// interprets DeletePlan
2732
pub struct DeleteInterpreter {
@@ -55,12 +60,29 @@ impl Interpreter for DeleteInterpreter {
5560
let db_name = self.plan.database_name.as_str();
5661
let tbl_name = self.plan.table_name.as_str();
5762
let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;
58-
tbl.delete(
59-
self.ctx.clone(),
60-
&self.plan.projection,
61-
&self.plan.selection,
62-
)
63-
.await?;
63+
let (filter, col_indices) = if let Some(scalar) = &self.plan.selection {
64+
let eb = ExpressionBuilderWithoutRenaming::create(self.plan.metadata.clone());
65+
(
66+
Some(eb.build(scalar)?),
67+
scalar.used_columns().into_iter().collect(),
68+
)
69+
} else {
70+
(None, vec![])
71+
};
72+
73+
let mut pipeline = Pipeline::create();
74+
tbl.delete(self.ctx.clone(), filter, col_indices, &mut pipeline)
75+
.await?;
76+
if !pipeline.pipes.is_empty() {
77+
let settings = self.ctx.get_settings();
78+
pipeline.set_max_threads(settings.get_max_threads()? as usize);
79+
let executor_settings = ExecutorSettings::try_create(&settings)?;
80+
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
81+
82+
self.ctx.set_executor(Arc::downgrade(&executor.get_inner()));
83+
executor.execute()?;
84+
drop(executor);
85+
}
6486

6587
Ok(PipelineBuildResult::create())
6688
}

src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs

Lines changed: 77 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -12,117 +12,103 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::hash_map::RandomState;
16-
use std::collections::HashMap;
17-
use std::collections::HashSet;
1815
use std::sync::Arc;
1916

2017
use common_base::base::tokio;
21-
use common_datablocks::BlockCompactThresholds;
22-
use common_datablocks::DataBlock;
23-
use common_datavalues::DataSchema;
24-
use common_exception::ErrorCode;
2518
use common_exception::Result;
26-
use common_storages_table_meta::caches::CacheManager;
27-
use common_storages_table_meta::meta::BlockMeta;
28-
use common_storages_table_meta::meta::SegmentInfo;
29-
use common_storages_table_meta::meta::Statistics;
30-
use common_storages_table_meta::meta::TableSnapshot;
31-
use common_storages_table_meta::meta::Versioned;
19+
use common_sql::executor::ExpressionBuilderWithoutRenaming;
20+
use common_sql::plans::DeletePlan;
21+
use common_sql::plans::Plan;
22+
use common_sql::plans::ScalarExpr;
23+
use common_sql::Planner;
24+
use common_storages_factory::Table;
25+
use common_storages_fuse::FuseTable;
26+
use databend_query::pipelines::executor::ExecutorSettings;
27+
use databend_query::pipelines::executor::PipelineCompleteExecutor;
28+
use databend_query::sessions::QueryContext;
3229
use databend_query::sessions::TableContext;
33-
use databend_query::storages::fuse::io::SegmentWriter;
34-
use databend_query::storages::fuse::io::TableMetaLocationGenerator;
35-
use databend_query::storages::fuse::operations::DeletionMutator;
36-
use databend_query::storages::fuse::statistics::ClusterStatsGenerator;
37-
use uuid::Uuid;
3830

31+
use crate::storages::fuse::table_test_fixture::execute_command;
32+
use crate::storages::fuse::table_test_fixture::execute_query;
33+
use crate::storages::fuse::table_test_fixture::expects_ok;
3934
use crate::storages::fuse::table_test_fixture::TestFixture;
4035

41-
/// [issue#6570](https://github.com/datafuselabs/databend/issues/6570)
42-
/// During deletion, there might be multiple segments become empty
43-
4436
#[tokio::test]
4537
async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> {
46-
// generates a batch of segments, and delete blocks from them
47-
// so that half of the segments will be empty
48-
4938
let fixture = TestFixture::new().await;
5039
let ctx = fixture.ctx();
51-
let location_generator = TableMetaLocationGenerator::with_prefix("_prefix".to_owned());
40+
let tbl_name = fixture.default_table_name();
41+
let db_name = fixture.default_db_name();
5242

53-
let segment_info_cache = CacheManager::instance().get_table_segment_cache();
54-
let data_accessor = ctx.get_data_operator()?.operator();
55-
let seg_writer = SegmentWriter::new(&data_accessor, &location_generator, &segment_info_cache);
43+
fixture.create_normal_table().await?;
5644

57-
let gen_test_seg = || async {
58-
// generates test segment, each of them contains only one block
59-
// structures are filled with arbitrary values, no effects for this test case
60-
let block_id = Uuid::new_v4().simple().to_string();
61-
let location = (block_id, DataBlock::VERSION);
62-
let test_block_meta = Arc::new(BlockMeta::new(
63-
1,
64-
1,
65-
1,
66-
HashMap::default(),
67-
HashMap::default(),
68-
None,
69-
location.clone(),
70-
None,
71-
0,
72-
));
73-
let segment = SegmentInfo::new(vec![test_block_meta], Statistics::default());
74-
Ok::<_, ErrorCode>((seg_writer.write_segment(segment).await?, location))
75-
};
45+
// insert
46+
for i in 0..10 {
47+
let qry = format!("insert into {}.{}(id) values({})", db_name, tbl_name, i);
48+
execute_command(ctx.clone(), qry.as_str()).await?;
49+
}
7650

77-
// generates 100 segments, for each segment, contains one block
78-
let mut test_segment_locations = vec![];
79-
let mut test_block_locations = vec![];
80-
for _ in 0..100 {
81-
let (segment_location, block_location) = gen_test_seg().await?;
82-
test_segment_locations.push(segment_location);
83-
test_block_locations.push(block_location);
51+
let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?;
52+
let table = catalog
53+
.get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name)
54+
.await?;
55+
// delete
56+
let query = format!("delete from {}.{} where id=1", db_name, tbl_name);
57+
let mut planner = Planner::new(ctx.clone());
58+
let (plan, _, _) = planner.plan_sql(&query).await?;
59+
if let Plan::Delete(delete) = plan {
60+
do_deletion(ctx.clone(), table.clone(), *delete).await?;
8461
}
8562

86-
let base_snapshot = TableSnapshot::new(
87-
Uuid::new_v4(),
88-
&None,
89-
None,
90-
DataSchema::empty(),
91-
Statistics::default(),
92-
test_segment_locations.clone(),
93-
None,
94-
None,
63+
// check count
64+
let expected = vec![
65+
"+---------------+-------+",
66+
"| segment_count | count |",
67+
"+---------------+-------+",
68+
"| 9 | 9 |",
69+
"+---------------+-------+",
70+
];
71+
let qry = format!(
72+
"select segment_count, block_count as count from fuse_snapshot('{}', '{}') limit 1",
73+
db_name, tbl_name
9574
);
75+
expects_ok(
76+
"check segment and block count",
77+
execute_query(fixture.ctx(), qry.as_str()).await,
78+
expected,
79+
)
80+
.await?;
81+
Ok(())
82+
}
9683

97-
let table_ctx: Arc<dyn TableContext> = ctx as Arc<dyn TableContext>;
98-
let mut mutator = DeletionMutator::try_create(
99-
table_ctx,
100-
data_accessor.clone(),
101-
location_generator,
102-
Arc::new(base_snapshot),
103-
ClusterStatsGenerator::default(),
104-
BlockCompactThresholds::default(),
105-
)?;
84+
pub async fn do_deletion(
85+
ctx: Arc<QueryContext>,
86+
table: Arc<dyn Table>,
87+
plan: DeletePlan,
88+
) -> Result<()> {
89+
let (filter, col_indices) = if let Some(scalar) = &plan.selection {
90+
let eb = ExpressionBuilderWithoutRenaming::create(plan.metadata.clone());
91+
(
92+
Some(eb.build(scalar)?),
93+
scalar.used_columns().into_iter().collect(),
94+
)
95+
} else {
96+
(None, vec![])
97+
};
10698

107-
// clear half of the segments
108-
for (i, _) in test_segment_locations.iter().enumerate().take(100) {
109-
if i % 2 == 0 {
110-
// empty the segment (segment only contains one block)
111-
mutator
112-
.replace_with(i, test_block_locations[i].clone(), None, DataBlock::empty())
113-
.await?;
114-
}
99+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
100+
let settings = ctx.get_settings();
101+
let mut pipeline = common_pipeline_core::Pipeline::create();
102+
fuse_table
103+
.delete(ctx.clone(), filter, col_indices, &mut pipeline)
104+
.await?;
105+
if !pipeline.pipes.is_empty() {
106+
pipeline.set_max_threads(settings.get_max_threads()? as usize);
107+
let executor_settings = ExecutorSettings::try_create(&settings)?;
108+
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
109+
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
110+
executor.execute()?;
111+
drop(executor);
115112
}
116-
117-
let (segments, _, _) = mutator.generate_segments().await?;
118-
119-
// half segments left after deletion
120-
assert_eq!(segments.len(), 50);
121-
122-
// new_segments should be a subset of test_segments in our case (no partial deletion of segment)
123-
let new_segments = HashSet::<_, RandomState>::from_iter(segments.into_iter());
124-
let test_segments = HashSet::from_iter(test_segment_locations.into_iter());
125-
assert!(new_segments.is_subset(&test_segments));
126-
127113
Ok(())
128114
}

src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@ mod block_compact_mutator;
1616
mod deletion_mutator;
1717
mod recluster_mutator;
1818
mod segments_compact_mutator;
19+
20+
pub use deletion_mutator::do_deletion;

src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,15 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
124124
segments_location,
125125
)
126126
.await?;
127-
let mut blocks_map = BTreeMap::new();
128-
blocks_map.insert(0, block_metas);
127+
let mut blocks_map: BTreeMap<i32, Vec<(usize, Arc<BlockMeta>)>> = BTreeMap::new();
128+
block_metas.iter().for_each(|(idx, b)| {
129+
if let Some(stats) = &b.cluster_stats {
130+
blocks_map
131+
.entry(stats.level)
132+
.or_default()
133+
.push((idx.0, b.clone()));
134+
}
135+
});
129136

130137
let mut mutator = ReclusterMutator::try_create(
131138
ctx,

src/query/service/tests/it/storages/fuse/operations/table_analyze.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_query::sql::plans::Plan;
2626
use databend_query::sql::Planner;
2727
use futures_util::TryStreamExt;
2828

29+
use crate::storages::fuse::operations::mutation::do_deletion;
2930
use crate::storages::fuse::table_test_fixture::execute_command;
3031
use crate::storages::fuse::table_test_fixture::execute_query;
3132
use crate::storages::fuse::table_test_fixture::TestFixture;
@@ -74,9 +75,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> {
7475
let mut planner = Planner::new(ctx.clone());
7576
let (plan, _, _) = planner.plan_sql(query).await?;
7677
if let Plan::Delete(delete) = plan {
77-
table
78-
.delete(ctx.clone(), &delete.projection, &delete.selection)
79-
.await?;
78+
do_deletion(ctx.clone(), table.clone(), *delete).await?;
8079
}
8180
execute_command(ctx.clone(), statistics_sql).await?;
8281

src/query/service/tests/it/storages/fuse/statistics.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use common_datablocks::BlockCompactThresholds;
1919
use common_datablocks::DataBlock;
2020
use common_datavalues::prelude::*;
2121
use common_functions::aggregates::eval_aggr;
22-
use common_functions::scalars::FunctionContext;
22+
use common_sql::evaluator::ChunkOperator;
2323
use common_sql::evaluator::Evaluator;
2424
use common_sql::executor::add;
2525
use common_sql::executor::col;
@@ -220,14 +220,14 @@ fn test_ft_stats_cluster_stats() -> common_exception::Result<()> {
220220
]);
221221

222222
let block_compactor = BlockCompactThresholds::new(1_000_000, 800_000, 100 * 1024 * 1024);
223-
let stats_gen = ClusterStatsGenerator::new(0, vec![0], vec![], 0, block_compactor);
223+
let stats_gen = ClusterStatsGenerator::new(0, vec![0], vec![], 0, block_compactor, vec![]);
224224
let (stats, _) = stats_gen.gen_stats_for_append(&blocks)?;
225225
assert!(stats.is_some());
226226
let stats = stats.unwrap();
227227
assert_eq!(vec![DataValue::Int64(1)], stats.min);
228228
assert_eq!(vec![DataValue::Int64(3)], stats.max);
229229

230-
let stats_gen = ClusterStatsGenerator::new(1, vec![1], vec![], 0, block_compactor);
230+
let stats_gen = ClusterStatsGenerator::new(1, vec![1], vec![], 0, block_compactor, vec![]);
231231
let (stats, _) = stats_gen.gen_stats_for_append(&blocks)?;
232232
assert!(stats.is_some());
233233
let stats = stats.unwrap();
@@ -249,7 +249,7 @@ async fn test_ft_cluster_stats_with_stats() -> common_exception::Result<()> {
249249
});
250250

251251
let block_compactor = BlockCompactThresholds::new(1_000_000, 800_000, 100 * 1024 * 1024);
252-
let stats_gen = ClusterStatsGenerator::new(0, vec![0], vec![], 0, block_compactor);
252+
let stats_gen = ClusterStatsGenerator::new(0, vec![0], vec![], 0, block_compactor, vec![]);
253253
let stats = stats_gen.gen_with_origin_stats(&blocks, origin.clone())?;
254254
assert!(stats.is_some());
255255
let stats = stats.unwrap();
@@ -258,22 +258,21 @@ async fn test_ft_cluster_stats_with_stats() -> common_exception::Result<()> {
258258

259259
// add expression executor.
260260
let expr = add(col("a", i32::to_data_type()), lit(1));
261-
let eval_node = Evaluator::eval_expression(&expr, &schema)?;
262-
let func_ctx = FunctionContext::default();
263-
let result = eval_node.eval(&func_ctx, &blocks)?;
264-
let output_schema =
265-
DataSchemaRefExt::create(vec![DataField::new("(a + 1)", i64::to_data_type())]);
266-
let blocks = DataBlock::create(output_schema, vec![result.vector]);
267-
268-
let stats_gen = ClusterStatsGenerator::new(0, vec![0], vec![], 0, block_compactor);
261+
let field = DataField::new("(a + 1)", i64::to_data_type());
262+
let operators = vec![ChunkOperator::Map {
263+
eval: Evaluator::eval_expression(&expr, &schema)?,
264+
name: field.name().to_string(),
265+
}];
266+
267+
let stats_gen = ClusterStatsGenerator::new(0, vec![1], vec![], 0, block_compactor, operators);
269268
let stats = stats_gen.gen_with_origin_stats(&blocks, origin.clone())?;
270269
assert!(stats.is_some());
271270
let stats = stats.unwrap();
272271
assert_eq!(vec![DataValue::Int64(2)], stats.min);
273272
assert_eq!(vec![DataValue::Int64(4)], stats.max);
274273

275274
// different cluster_key_id.
276-
let stats_gen = ClusterStatsGenerator::new(1, vec![0], vec![], 0, block_compactor);
275+
let stats_gen = ClusterStatsGenerator::new(1, vec![0], vec![], 0, block_compactor, vec![]);
277276
let stats = stats_gen.gen_with_origin_stats(&blocks, origin)?;
278277
assert!(stats.is_none());
279278

0 commit comments

Comments
 (0)