Skip to content

Commit ac27ee7

Browse files
zhang2014BohuTANG
andauthored
refactor(query): use scan table physical plan for copy into table from stage (#15016)
* refactor(query): use scan table physical plan for copy into stage * refactor(query): use scan table physical plan for copy into satge * refactor(query): use scan table physical plan for copy into satge * refactor(query): use scan table physical plan for copy into satge * refactor(query): use scan table physical plan for copy into satge * refactor(query): use scan table physical plan for copy into satge * refactor(query): use scan table physical plan for copy into satge * refactor(query): use scan table physical plan for copy into satge --------- Co-authored-by: Bohu <overred.shuttler@gmail.com>
1 parent d97ffa0 commit ac27ee7

File tree

23 files changed

+172
-134
lines changed

23 files changed

+172
-134
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/src/table.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -318,10 +318,6 @@ pub trait Table: Sync + Send {
318318
}
319319
}
320320

321-
fn set_block_thresholds(&self, _thresholds: BlockThresholds) {
322-
unimplemented!()
323-
}
324-
325321
#[async_backtrace::framed]
326322
async fn compact_segments(
327323
&self,

src/query/catalog/src/table_context.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_base::base::ProgressValues;
2727
use databend_common_base::runtime::profile::Profile;
2828
use databend_common_exception::ErrorCode;
2929
use databend_common_exception::Result;
30+
use databend_common_expression::BlockThresholds;
3031
use databend_common_expression::DataBlock;
3132
use databend_common_expression::Expr;
3233
use databend_common_expression::FunctionContext;
@@ -268,4 +269,7 @@ pub trait TableContext: Send + Sync {
268269

269270
fn has_bloom_runtime_filters(&self, id: usize) -> bool;
270271
fn txn_mgr(&self) -> TxnManagerRef;
272+
273+
fn get_read_block_thresholds(&self) -> BlockThresholds;
274+
fn set_read_block_thresholds(&self, _thresholds: BlockThresholds);
271275
}

src/query/service/src/interpreters/interpreter_copy_into_table.rs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::sync::Arc;
1617

1718
use databend_common_exception::Result;
@@ -27,6 +28,7 @@ use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
2728
use databend_common_sql::executor::physical_plans::Exchange;
2829
use databend_common_sql::executor::physical_plans::FragmentKind;
2930
use databend_common_sql::executor::physical_plans::Project;
31+
use databend_common_sql::executor::physical_plans::TableScan;
3032
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
3133
use databend_common_sql::executor::PhysicalPlan;
3234
use databend_common_storage::StageFileInfo;
@@ -94,7 +96,6 @@ impl CopyIntoTableInterpreter {
9496
&self,
9597
plan: &CopyIntoTablePlan,
9698
) -> Result<(PhysicalPlan, Vec<UpdateStreamMetaReq>)> {
97-
let mut next_plan_id = 0;
9899
let to_table = self
99100
.ctx
100101
.get_table(
@@ -109,36 +110,46 @@ impl CopyIntoTableInterpreter {
109110
update_stream_meta_reqs = update_stream_meta;
110111
let query_physical_plan = Box::new(query_interpreter.build_physical_plan().await?);
111112

112-
let current_plan_id = query_physical_plan.get_id();
113-
next_plan_id = current_plan_id + 2;
114113
let result_columns = query_interpreter.get_result_columns();
115114
CopyIntoTableSource::Query(Box::new(PhysicalPlan::Project(
116115
Project::from_columns_binding(
117-
current_plan_id + 1,
116+
0,
118117
query_physical_plan,
119118
result_columns,
120119
query_interpreter.get_ignore_result(),
121120
)?,
122121
)))
123122
} else {
124123
let stage_table = StageTable::try_create(plan.stage_table_info.clone())?;
125-
let read_source_plan = Box::new(
126-
stage_table
127-
.read_plan_with_catalog(
128-
self.ctx.clone(),
129-
plan.catalog_info.catalog_name().to_string(),
130-
None,
131-
None,
132-
false,
133-
false,
134-
)
135-
.await?,
136-
);
137-
CopyIntoTableSource::Stage(read_source_plan)
124+
125+
let data_source_plan = stage_table
126+
.read_plan_with_catalog(
127+
self.ctx.clone(),
128+
plan.catalog_info.catalog_name().to_string(),
129+
None,
130+
None,
131+
false,
132+
false,
133+
)
134+
.await?;
135+
136+
let mut name_mapping = BTreeMap::new();
137+
for (idx, field) in data_source_plan.schema().fields.iter().enumerate() {
138+
name_mapping.insert(field.name.clone(), idx);
139+
}
140+
141+
CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan {
142+
plan_id: 0,
143+
name_mapping,
144+
stat_info: None,
145+
table_index: None,
146+
internal_column: None,
147+
source: Box::new(data_source_plan),
148+
})))
138149
};
139150

140151
let mut root = PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable {
141-
plan_id: next_plan_id,
152+
plan_id: 0,
142153
catalog_info: plan.catalog_info.clone(),
143154
required_values_schema: plan.required_values_schema.clone(),
144155
values_consts: plan.values_consts.clone(),
@@ -151,17 +162,21 @@ impl CopyIntoTableInterpreter {
151162

152163
source,
153164
}));
154-
next_plan_id += 1;
165+
155166
if plan.enable_distributed {
156167
root = PhysicalPlan::Exchange(Exchange {
157-
plan_id: next_plan_id,
168+
plan_id: 0,
158169
input: Box::new(root),
159170
kind: FragmentKind::Merge,
160171
keys: Vec::new(),
161172
allow_adjust_parallelism: true,
162173
ignore_exchange: false,
163174
});
164175
}
176+
177+
let mut next_plan_id = 0;
178+
root.adjust_plan_id(&mut next_plan_id);
179+
165180
Ok((root, update_stream_meta_reqs))
166181
}
167182

src/query/service/src/pipelines/builders/builder_copy_into_table.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use databend_common_sql::executor::physical_plans::CopyIntoTable;
3131
use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
3232
use databend_common_sql::plans::CopyIntoTableMode;
3333
use databend_common_storage::StageFileInfo;
34-
use databend_common_storages_stage::StageTable;
3534
use log::debug;
3635
use log::info;
3736

@@ -51,10 +50,11 @@ impl PipelineBuilder {
5150
self.build_pipeline(input)?;
5251
input.output_schema()?
5352
}
54-
CopyIntoTableSource::Stage(source) => {
55-
let stage_table = StageTable::try_create(copy.stage_table_info.clone())?;
56-
stage_table.set_block_thresholds(to_table.get_block_thresholds());
57-
stage_table.read_data(self.ctx.clone(), source, &mut self.main_pipeline, false)?;
53+
CopyIntoTableSource::Stage(input) => {
54+
self.ctx
55+
.set_read_block_thresholds(to_table.get_block_thresholds());
56+
57+
self.build_pipeline(input)?;
5858
copy.required_source_schema.clone()
5959
}
6060
};

src/query/service/src/pipelines/builders/builder_join.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,13 @@ impl PipelineBuilder {
9696

9797
pub(crate) fn build_join(&mut self, join: &HashJoin) -> Result<()> {
9898
// for merge into target table as build side.
99-
let (merge_into_build_table_index, merge_into_is_distributed) =
99+
let (enable_merge_into_optimization, merge_into_is_distributed) =
100100
self.merge_into_get_optimization_flag(join);
101101

102102
let state = self.build_join_state(
103103
join,
104-
merge_into_build_table_index,
105104
merge_into_is_distributed,
105+
enable_merge_into_optimization,
106106
)?;
107107
self.expand_build_side_pipeline(&join.build, join, state.clone())?;
108108
self.build_join_probe(join, state)
@@ -111,17 +111,17 @@ impl PipelineBuilder {
111111
fn build_join_state(
112112
&mut self,
113113
join: &HashJoin,
114-
merge_into_target_table_index: IndexType,
115114
merge_into_is_distributed: bool,
115+
enable_merge_into_optimization: bool,
116116
) -> Result<Arc<HashJoinState>> {
117117
HashJoinState::try_create(
118118
self.ctx.clone(),
119119
join.build.output_schema()?,
120120
&join.build_projections,
121121
HashJoinDesc::create(join)?,
122122
&join.probe_to_build,
123-
merge_into_target_table_index,
124123
merge_into_is_distributed,
124+
enable_merge_into_optimization,
125125
)
126126
}
127127

src/query/service/src/pipelines/builders/merge_into_join_optimizations.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,22 @@
1414

1515
use databend_common_sql::executor::physical_plans::HashJoin;
1616
use databend_common_sql::executor::PhysicalPlan;
17-
use databend_common_sql::IndexType;
18-
use databend_common_sql::DUMMY_TABLE_INDEX;
1917
use databend_common_storages_fuse::operations::need_reserve_block_info;
2018

2119
use crate::pipelines::PipelineBuilder;
2220

2321
impl PipelineBuilder {
24-
pub(crate) fn merge_into_get_optimization_flag(&self, join: &HashJoin) -> (IndexType, bool) {
22+
pub(crate) fn merge_into_get_optimization_flag(&self, join: &HashJoin) -> (bool, bool) {
2523
// for merge into target table as build side.
26-
let (merge_into_build_table_index, merge_into_is_distributed) =
27-
if let PhysicalPlan::TableScan(scan) = &*join.build {
28-
let (need_block_info, is_distributed) =
29-
need_reserve_block_info(self.ctx.clone(), scan.table_index);
30-
if need_block_info {
31-
(scan.table_index, is_distributed)
32-
} else {
33-
(DUMMY_TABLE_INDEX, false)
34-
}
35-
} else {
36-
(DUMMY_TABLE_INDEX, false)
37-
};
38-
(merge_into_build_table_index, merge_into_is_distributed)
24+
match &*join.build {
25+
PhysicalPlan::TableScan(scan) => match scan.table_index {
26+
None | Some(databend_common_sql::DUMMY_TABLE_INDEX) => (false, false),
27+
Some(table_index) => match need_reserve_block_info(self.ctx.clone(), table_index) {
28+
(true, is_distributed) => (true, is_distributed),
29+
_ => (false, false),
30+
},
31+
},
32+
_ => (false, false),
33+
}
3934
}
4035
}

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use databend_common_hashtable::HashJoinHashMap;
3535
use databend_common_hashtable::HashtableKeyable;
3636
use databend_common_sql::plans::JoinType;
3737
use databend_common_sql::ColumnSet;
38-
use databend_common_sql::IndexType;
3938
use ethnum::U256;
4039
use parking_lot::RwLock;
4140

@@ -130,8 +129,8 @@ impl HashJoinState {
130129
build_projections: &ColumnSet,
131130
hash_join_desc: HashJoinDesc,
132131
probe_to_build: &[(usize, (bool, bool))],
133-
merge_into_target_table_index: IndexType,
134132
merge_into_is_distributed: bool,
133+
enable_merge_into_optimization: bool,
135134
) -> Result<Arc<HashJoinState>> {
136135
if matches!(
137136
hash_join_desc.join_type,
@@ -161,10 +160,12 @@ impl HashJoinState {
161160
_continue_build_dummy_receiver,
162161
partition_id: AtomicI8::new(-2),
163162
enable_spill,
164-
merge_into_state: MergeIntoState::try_create_merge_into_state(
165-
merge_into_target_table_index,
166-
merge_into_is_distributed,
167-
),
163+
merge_into_state: match enable_merge_into_optimization {
164+
false => None,
165+
true => Some(MergeIntoState::create_merge_into_state(
166+
merge_into_is_distributed,
167+
)),
168+
},
168169
}))
169170
}
170171

src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ use databend_common_expression::DataBlock;
2525
use databend_common_hashtable::MergeIntoBlockInfoIndex;
2626
use databend_common_hashtable::RowPtr;
2727
use databend_common_sql::plans::JoinType;
28-
use databend_common_sql::IndexType;
29-
use databend_common_sql::DUMMY_TABLE_INDEX;
3028
use databend_common_storages_fuse::operations::BlockMetaIndex;
3129
use log::info;
3230

@@ -59,21 +57,14 @@ pub struct MergeIntoState {
5957
}
6058

6159
impl MergeIntoState {
62-
pub(crate) fn try_create_merge_into_state(
63-
merge_into_target_table_index: IndexType,
64-
merge_into_is_distributed: bool,
65-
) -> Option<SyncUnsafeCell<Self>> {
66-
if merge_into_target_table_index != DUMMY_TABLE_INDEX {
67-
Some(SyncUnsafeCell::new(MergeIntoState {
68-
merge_into_is_distributed,
69-
block_info_index: Default::default(),
70-
matched: Vec::new(),
71-
atomic_pointer: MatchedPtr(std::ptr::null_mut()),
72-
chunk_offsets: Vec::with_capacity(100),
73-
}))
74-
} else {
75-
None
76-
}
60+
pub(crate) fn create_merge_into_state(is_distributed: bool) -> SyncUnsafeCell<Self> {
61+
SyncUnsafeCell::new(MergeIntoState {
62+
merge_into_is_distributed: is_distributed,
63+
block_info_index: Default::default(),
64+
matched: Vec::new(),
65+
atomic_pointer: MatchedPtr(std::ptr::null_mut()),
66+
chunk_offsets: Vec::with_capacity(100),
67+
})
7768
}
7869
}
7970

src/query/service/src/schedulers/fragments/plan_fragment.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -426,16 +426,10 @@ impl PlanFragment {
426426

427427
let mut data_sources = HashMap::new();
428428

429-
let mut collect_data_source = |plan: &PhysicalPlan| match plan {
430-
PhysicalPlan::TableScan(scan) => {
429+
let mut collect_data_source = |plan: &PhysicalPlan| {
430+
if let PhysicalPlan::TableScan(scan) = plan {
431431
data_sources.insert(scan.plan_id, *scan.source.clone());
432432
}
433-
PhysicalPlan::CopyIntoTable(copy) => {
434-
if let Some(stage) = copy.source.as_stage().cloned() {
435-
data_sources.insert(copy.plan_id, *stage);
436-
}
437-
}
438-
_ => {}
439433
};
440434

441435
PhysicalPlan::traverse(
@@ -481,12 +475,10 @@ impl PhysicalPlanReplacer for ReplaceReadSource {
481475
..plan.clone()
482476
})))
483477
}
484-
CopyIntoTableSource::Stage(_) => {
485-
let source = self.sources.remove(&plan.plan_id).ok_or_else(|| {
486-
ErrorCode::Internal("Cannot find data source for copy into plan")
487-
})?;
478+
CopyIntoTableSource::Stage(v) => {
479+
let input = self.replace(v)?;
488480
Ok(PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable {
489-
source: CopyIntoTableSource::Stage(Box::new(source)),
481+
source: CopyIntoTableSource::Stage(Box::new(input)),
490482
..plan.clone()
491483
})))
492484
}

0 commit comments

Comments
 (0)