Skip to content

Commit 37820a8

Browse files
authored
feat(query): support explain insert with insert source (#15357)
* feat(query): support explain insert with insert source * fix explain delete err * refactor explain_insert into plan/insert.rs explain * fix clippy err
1 parent 3e5c251 commit 37820a8

File tree

8 files changed

+281
-30
lines changed

8 files changed

+281
-30
lines changed

src/meta/app/src/principal/user_stage.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,21 @@ pub struct CopyOptions {
487487
pub detailed_output: bool,
488488
}
489489

490+
impl Display for CopyOptions {
491+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
492+
write!(f, "OnErrorMode {}", self.on_error)?;
493+
write!(f, "SizeLimit {}", self.size_limit)?;
494+
write!(f, "MaxFiles {}", self.max_files)?;
495+
write!(f, "SplitSize {}", self.split_size)?;
496+
write!(f, "Purge {}", self.purge)?;
497+
write!(f, "DisableVariantCheck {}", self.disable_variant_check)?;
498+
write!(f, "ReturnFailedOnly {}", self.return_failed_only)?;
499+
write!(f, "MaxFileSize {}", self.max_file_size)?;
500+
write!(f, "Single {}", self.single)?;
501+
write!(f, "DetailedOutput {}", self.detailed_output)
502+
}
503+
}
504+
490505
impl CopyOptions {
491506
pub fn apply(&mut self, opts: &BTreeMap<String, String>, ignore_unknown: bool) -> Result<()> {
492507
if opts.is_empty() {

src/query/catalog/src/plan/datasource/datasource_info/stage.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::fmt::Debug;
16+
use std::fmt::Display;
1617
use std::fmt::Formatter;
1718
use std::sync::Arc;
1819

@@ -83,3 +84,15 @@ impl Debug for StageTableInfo {
8384
write!(f, "{:?}", self.stage_info)
8485
}
8586
}
87+
88+
impl Display for StageTableInfo {
89+
// Ignore the schema.
90+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91+
write!(f, "StageName {}", self.stage_info.stage_name)?;
92+
write!(f, "StageType {}", self.stage_info.stage_type)?;
93+
write!(f, "StageParam {}", self.stage_info.stage_params.storage)?;
94+
write!(f, "IsTemporary {}", self.stage_info.is_temporary)?;
95+
write!(f, "FileFormatParams {}", self.stage_info.file_format_params)?;
96+
write!(f, "CopyOption {}", self.stage_info.copy_options)
97+
}
98+
}

src/query/service/src/interpreters/interpreter_explain.rs

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use databend_common_sql::optimizer::ColumnSet;
3232
use databend_common_sql::plans::FunctionCall;
3333
use databend_common_sql::plans::UpdatePlan;
3434
use databend_common_sql::BindContext;
35-
use databend_common_sql::InsertInputSource;
3635
use databend_common_sql::MetadataRef;
3736
use databend_common_storages_result_cache::gen_result_cache_key;
3837
use databend_common_storages_result_cache::ResultCacheReader;
@@ -97,25 +96,7 @@ impl Interpreter for ExplainInterpreter {
9796
self.explain_query(s_expr, metadata, bind_context, formatted_ast)
9897
.await?
9998
}
100-
Plan::Insert(insert_plan) => {
101-
let mut res = self.explain_plan(&self.plan)?;
102-
if let InsertInputSource::SelectPlan(plan) = &insert_plan.source {
103-
if let Plan::Query {
104-
s_expr,
105-
metadata,
106-
bind_context,
107-
formatted_ast,
108-
..
109-
} = &**plan
110-
{
111-
let query = self
112-
.explain_query(s_expr, metadata, bind_context, formatted_ast)
113-
.await?;
114-
res.extend(query);
115-
}
116-
}
117-
vec![DataBlock::concat(&res)?]
118-
}
99+
Plan::Insert(insert_plan) => insert_plan.explain(self.config.verbose).await?,
119100
Plan::CreateTable(plan) => match &plan.as_select {
120101
Some(box Plan::Query {
121102
s_expr,
@@ -557,6 +538,18 @@ impl ExplainInterpreter {
557538
result.extend(input);
558539
}
559540

541+
if result.is_empty() {
542+
let table_name = format!(
543+
"{}.{}.{}",
544+
delete.catalog_name, delete.database_name, delete.table_name
545+
);
546+
let children = vec![FormatTreeNode::new(format!("table: {table_name}"))];
547+
let formatted_plan = FormatTreeNode::with_children("DeletePlan:".to_string(), children)
548+
.format_pretty()?;
549+
let line_split_result: Vec<&str> = formatted_plan.lines().collect();
550+
let formatted_plan = StringType::from_data(line_split_result);
551+
result.push(DataBlock::new_from_columns(vec![formatted_plan]));
552+
}
560553
Ok(vec![DataBlock::concat(&result)?])
561554
}
562555
}

src/query/sql/src/planner/plans/copy_into_table.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::fmt::Debug;
16+
use std::fmt::Display;
1617
use std::fmt::Formatter;
1718
use std::str::FromStr;
1819
use std::sync::Arc;
@@ -47,6 +48,17 @@ pub enum ValidationMode {
4748
ReturnAllErrors,
4849
}
4950

51+
impl Display for ValidationMode {
52+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
53+
match self {
54+
ValidationMode::None => write!(f, ""),
55+
ValidationMode::ReturnNRows(v) => write!(f, "RETURN_ROWS={v}"),
56+
ValidationMode::ReturnErrors => write!(f, "RETURN_ERRORS"),
57+
ValidationMode::ReturnAllErrors => write!(f, "RETURN_ALL_ERRORS"),
58+
}
59+
}
60+
}
61+
5062
impl FromStr for ValidationMode {
5163
type Err = String;
5264
fn from_str(s: &str) -> Result<Self, String> {
@@ -75,6 +87,21 @@ pub enum CopyIntoTableMode {
7587
Copy,
7688
}
7789

90+
impl Display for CopyIntoTableMode {
91+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92+
match self {
93+
CopyIntoTableMode::Insert { overwrite } => {
94+
if *overwrite {
95+
write!(f, "INSERT OVERWRITE")
96+
} else {
97+
write!(f, "INSERT")
98+
}
99+
}
100+
CopyIntoTableMode::Replace => write!(f, "REPLACE"),
101+
CopyIntoTableMode::Copy => write!(f, "COPY"),
102+
}
103+
}
104+
}
78105
impl CopyIntoTableMode {
79106
pub fn is_overwrite(&self) -> bool {
80107
match self {

src/query/sql/src/planner/plans/insert.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_ast::ast::FormatTreeNode;
18+
use databend_common_expression::types::StringType;
19+
use databend_common_expression::DataBlock;
1720
use databend_common_expression::DataSchemaRef;
21+
use databend_common_expression::FromData;
1822
use databend_common_expression::Scalar;
1923
use databend_common_expression::TableSchemaRef;
2024
use databend_common_meta_app::principal::FileFormatParams;
@@ -26,6 +30,7 @@ use serde::Deserialize;
2630
use serde::Serialize;
2731

2832
use super::Plan;
33+
use crate::plans::CopyIntoTablePlan;
2934

3035
#[derive(Clone, Debug, EnumAsInner)]
3136
pub enum InsertInputSource {
@@ -82,6 +87,152 @@ impl Insert {
8287
pub fn has_select_plan(&self) -> bool {
8388
matches!(&self.source, InsertInputSource::SelectPlan(_))
8489
}
90+
91+
#[async_backtrace::framed]
92+
pub async fn explain(
93+
&self,
94+
verbose: bool,
95+
) -> databend_common_exception::Result<Vec<DataBlock>> {
96+
let mut result = vec![];
97+
98+
let Insert {
99+
catalog,
100+
database,
101+
table,
102+
schema,
103+
overwrite,
104+
// table_info only used create table as select.
105+
table_info: _,
106+
source,
107+
} = self;
108+
109+
let table_name = format!("{}.{}.{}", catalog, database, table);
110+
let inserted_columns = schema
111+
.fields
112+
.iter()
113+
.map(|field| format!("{}.{} (#{})", table, field.name, field.column_id))
114+
.collect::<Vec<_>>()
115+
.join(",");
116+
117+
let mut children = vec![
118+
FormatTreeNode::new(format!("table: {table_name}")),
119+
FormatTreeNode::new(format!("inserted columns: [{inserted_columns}]")),
120+
FormatTreeNode::new(format!("overwrite: {overwrite}")),
121+
];
122+
123+
let mut formatted_plan = String::new();
124+
125+
match source {
126+
InsertInputSource::SelectPlan(plan) => {
127+
if let Plan::Query {
128+
s_expr, metadata, ..
129+
} = &**plan
130+
{
131+
let metadata = &*metadata.read();
132+
let sub_tree = s_expr.to_format_tree(metadata, verbose)?;
133+
children.push(sub_tree);
134+
formatted_plan = FormatTreeNode::with_children(
135+
"InsertPlan (subquery):".to_string(),
136+
children,
137+
)
138+
.format_pretty()?;
139+
}
140+
}
141+
InsertInputSource::Values(values) => match values {
142+
InsertValue::Values { .. } => {
143+
formatted_plan =
144+
FormatTreeNode::with_children("InsertPlan (values):".to_string(), children)
145+
.format_pretty()?;
146+
}
147+
InsertValue::RawValues { .. } => {
148+
formatted_plan = FormatTreeNode::with_children(
149+
"InsertPlan (rawvalues):".to_string(),
150+
children,
151+
)
152+
.format_pretty()?;
153+
}
154+
},
155+
InsertInputSource::Stage(plan) => match *plan.clone() {
156+
Plan::CopyIntoTable(copy_plan) => {
157+
let CopyIntoTablePlan {
158+
no_file_to_copy,
159+
from_attachment,
160+
required_values_schema,
161+
required_source_schema,
162+
write_mode,
163+
validation_mode,
164+
force,
165+
stage_table_info,
166+
enable_distributed,
167+
..
168+
} = &*copy_plan;
169+
let required_values_schema = required_values_schema
170+
.fields()
171+
.iter()
172+
.map(|field| field.name().to_string())
173+
.collect::<Vec<_>>()
174+
.join(",");
175+
let required_source_schema = required_source_schema
176+
.fields()
177+
.iter()
178+
.map(|field| field.name().to_string())
179+
.collect::<Vec<_>>()
180+
.join(",");
181+
let stage_node = vec![
182+
FormatTreeNode::new(format!("no_file_to_copy: {no_file_to_copy}")),
183+
FormatTreeNode::new(format!("from_attachment: {from_attachment}")),
184+
FormatTreeNode::new(format!(
185+
"required_values_schema: [{required_values_schema}]"
186+
)),
187+
FormatTreeNode::new(format!(
188+
"required_source_schema: [{required_source_schema}]"
189+
)),
190+
FormatTreeNode::new(format!("write_mode: {write_mode}")),
191+
FormatTreeNode::new(format!("validation_mode: {validation_mode}")),
192+
FormatTreeNode::new(format!("force: {force}")),
193+
FormatTreeNode::new(format!("stage_table_info: {stage_table_info}")),
194+
FormatTreeNode::new(format!("enable_distributed: {enable_distributed}")),
195+
];
196+
children.extend(stage_node);
197+
formatted_plan =
198+
FormatTreeNode::with_children("InsertPlan (stage):".to_string(), children)
199+
.format_pretty()?;
200+
}
201+
_ => unreachable!("plan in InsertInputSource::Stag must be CopyIntoTable"),
202+
},
203+
InsertInputSource::StreamingWithFileFormat {
204+
format,
205+
on_error_mode,
206+
start,
207+
..
208+
} => {
209+
let stage_node = vec![
210+
FormatTreeNode::new(format!("format: {format}")),
211+
FormatTreeNode::new(format!("on_error_mode: {on_error_mode}")),
212+
FormatTreeNode::new(format!("start: {start}")),
213+
];
214+
children.extend(stage_node);
215+
formatted_plan = FormatTreeNode::with_children(
216+
"InsertPlan (StreamingWithFileFormat):".to_string(),
217+
children,
218+
)
219+
.format_pretty()?;
220+
}
221+
InsertInputSource::StreamingWithFormat(_, _, _) => {
222+
// used in clickhouse handler only; will discard soon
223+
formatted_plan = FormatTreeNode::with_children(
224+
"InsertPlan (StreamingWithFormat):".to_string(),
225+
children,
226+
)
227+
.format_pretty()?;
228+
}
229+
};
230+
231+
let line_split_result: Vec<&str> = formatted_plan.lines().collect();
232+
let formatted_plan = StringType::from_data(line_split_result);
233+
result.push(DataBlock::new_from_columns(vec![formatted_plan]));
234+
Ok(vec![DataBlock::concat(&result)?])
235+
}
85236
}
86237

87238
impl std::fmt::Debug for Insert {

tests/sqllogictests/suites/mode/standalone/explain/delete.test

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ insert into t1 values(1, 2), (2, 3), (3, 4), (8, 9);
1616
statement ok
1717
insert into t2 values(2, 3), (3, 4);
1818

19+
query T
20+
explain delete from t1
21+
----
22+
DeletePlan:
23+
└── table: default.default.t1
24+
25+
1926
query T
2027
explain delete from t1 where a in (select a from t2);
2128
----

tests/sqllogictests/suites/mode/standalone/explain/explain.test

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1703,16 +1703,17 @@ Filter
17031703
query T
17041704
explain insert into t2 select * from t1;
17051705
----
1706-
Insert
1707-
TableScan
1708-
├── table: default.default.t1
1709-
├── output columns: [a (#0), b (#1), c (#2)]
1710-
├── read rows: 0
1711-
├── read size: 0
1712-
├── partitions total: 0
1713-
├── partitions scanned: 0
1714-
├── push downs: [filters: [], limit: NONE]
1715-
└── estimated rows: 0.00
1706+
InsertPlan (subquery):
1707+
├── table: default.default.t2
1708+
├── inserted columns: [t2.a (#0),t2.b (#1),t2.c (#2)]
1709+
├── overwrite: false
1710+
└── EvalScalar
1711+
├── scalars: [t1.a (#0) AS (#0), t1.b (#1) AS (#1), t1.c (#2) AS (#2)]
1712+
└── Scan
1713+
├── table: default.t1
1714+
├── filters: []
1715+
├── order by: []
1716+
└── limit: NONE
17161717

17171718

17181719
statement ok

0 commit comments

Comments
 (0)