Skip to content

refactor: Use String to replace Expression in DeletePlan #7716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ impl SchemaApiTestSuite {
let ident = TableIdent::new(tb_id, seq);

let want = TableInfo {
ident: ident.clone(),
ident,
desc: format!("'{}'.'{}'.'{}'", tenant, db_name, tbl_name),
name: tbl_name.into(),
meta: table_meta(created_on),
Expand All @@ -1549,7 +1549,7 @@ impl SchemaApiTestSuite {

let got = mt.get_table((tenant, db_name, tbl_name).into()).await?;
let want = TableInfo {
ident: tb_ident_2.clone(),
ident: tb_ident_2,
desc: format!("'{}'.'{}'.'{}'", tenant, db_name, tbl_name),
name: tbl_name.into(),
meta: table_meta(created_on),
Expand Down Expand Up @@ -1584,7 +1584,7 @@ impl SchemaApiTestSuite {

let got = mt.get_table((tenant, "db1", "tb2").into()).await.unwrap();
let want = TableInfo {
ident: tb_ident_2.clone(),
ident: tb_ident_2,
desc: format!("'{}'.'{}'.'{}'", tenant, db_name, tbl_name),
name: tbl_name.into(),
meta: table_meta(created_on),
Expand Down Expand Up @@ -1784,7 +1784,7 @@ impl SchemaApiTestSuite {
let cur_db = mt.get_database(Self::req_get_db(tenant, db1_name)).await?;
assert!(old_db.ident.seq < cur_db.ident.seq);
let got = mt.get_table((tenant, db1_name, tb2_name).into()).await?;
got.ident.clone()
got.ident
};

info!("--- rename table, ok");
Expand All @@ -1796,7 +1796,7 @@ impl SchemaApiTestSuite {

let got = mt.get_table((tenant, db1_name, tb3_name).into()).await?;
let want = TableInfo {
ident: tb_ident.clone(),
ident: tb_ident,
desc: format!("'{}'.'{}'.'{}'", tenant, db1_name, tb3_name),
name: tb3_name.into(),
meta: table_meta(created_on),
Expand Down Expand Up @@ -1842,7 +1842,7 @@ impl SchemaApiTestSuite {
let got = mt.get_table((tenant, db1_name, tb2_name).into()).await?;
assert_ne!(tb_ident.table_id, got.ident.table_id);
assert_ne!(tb_ident.seq, got.ident.seq);
got.ident.clone()
got.ident
};

info!("--- db1,tb2(no_nil) -> db1,tb3(no_nil), error");
Expand Down Expand Up @@ -2010,7 +2010,7 @@ impl SchemaApiTestSuite {
let ident = TableIdent::new(tb_id, seq);

let want = TableInfo {
ident: ident.clone(),
ident,
desc: format!("'{}'.'{}'.'{}'", tenant, db_name, tbl_name),
name: tbl_name.into(),
meta: table_meta(created_on),
Expand Down Expand Up @@ -2140,7 +2140,7 @@ impl SchemaApiTestSuite {
let ident = TableIdent::new(tb_id, seq);

let want = TableInfo {
ident: ident.clone(),
ident,
desc: format!("'{}'.'{}'.'{}'", tenant, db_name, tbl_name),
name: tbl_name.into(),
meta: table_meta(created_on),
Expand Down Expand Up @@ -3128,7 +3128,7 @@ impl SchemaApiTestSuite {
let ident = TableIdent::new(tb_id, seq);

let want = TableInfo {
ident: ident.clone(),
ident,
desc: format!("'{}'.'{}'.'{}'", tenant, db_name, tbl_name),
name: tbl_name.into(),
meta: table_meta(created_on),
Expand Down
2 changes: 1 addition & 1 deletion src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use maplit::hashmap;
use crate::schema::database::DatabaseNameIdent;

/// Globally unique identifier of a version of TableMeta.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, Eq, PartialEq, Default)]
pub struct TableIdent {
/// Globally unique id to identify a table.
pub table_id: u64,
Expand Down
20 changes: 17 additions & 3 deletions src/query/legacy-planners/src/plan_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,30 @@ use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_meta_app::schema::TableIdent;

use crate::Expression;
use crate::Projection;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
/// # TODO
///
/// From @xuanwo
///
/// Ideally, we need to use `Scalar` in DeletePlan.selection. But we met a
/// cycle deps here. So we have to change `selection` in String first, and
/// change into `Scalar` when our `Planner` has been moved out.
///
/// At this stage, DeletePlan's selection expr will be parsed twice:
///
/// - Parsed during `bind` to get column index and projection index.
/// - Parsed during `execution` to get the correct columns
///
/// It's an ugly but necessary price to pay. Without this, we would sink in
/// hell forever.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct DeletePlan {
pub catalog_name: String,
pub database_name: String,
pub table_name: String,
pub table_id: TableIdent,
pub selection: Option<Expression>,
pub selection: Option<String>,
pub projection: Projection,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Catalog for ImmutableCatalog {
.get_by_id(&table_id)
.ok_or_else(|| ErrorCode::UnknownTable(format!("Unknown table id: '{}'", table_id)))?;
let ti = table.get_table_info();
Ok((ti.ident.clone(), Arc::new(ti.meta.clone())))
Ok((ti.ident, Arc::new(ti.meta.clone())))
}

async fn get_table(
Expand Down
57 changes: 11 additions & 46 deletions src/query/service/src/sql/planner/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use common_ast::ast::Expr;
use common_ast::ast::TableReference;
use common_exception::ErrorCode;
use common_exception::Result;
use common_legacy_planners::DeletePlan;
use common_legacy_planners::Expression;
use common_legacy_planners::Projection;

use crate::sql::binder::Binder;
use crate::sql::binder::ScalarBinder;
use crate::sql::executor::ExpressionBuilderWithoutRenaming;
use crate::sql::plans::Plan;
use crate::sql::statements::query::QueryASTIRVisitor;
use crate::sql::BindContext;

pub struct DeleteCollectPushDowns {}
impl QueryASTIRVisitor<HashSet<String>> for DeleteCollectPushDowns {
fn visit_expr(expr: &mut Expression, require_columns: &mut HashSet<String>) -> Result<()> {
if let Expression::Column(name) = expr {
if !require_columns.contains(name) {
require_columns.insert(name.clone());
}
}

Ok(())
}

fn visit_filter(predicate: &mut Expression, data: &mut HashSet<String>) -> Result<()> {
Self::visit_recursive_expr(predicate, data)
}
}
use crate::sql::ScalarExpr;

impl<'a> Binder {
pub(in crate::sql::planner::binder) async fn bind_delete(
Expand Down Expand Up @@ -88,44 +67,30 @@ impl<'a> Binder {
&[],
);

let mut expression = None;
let mut require_columns: HashSet<String> = HashSet::new();
if let Some(expr) = selection {
let (scalar, _) = scalar_binder.bind(expr).await?;
let eb = ExpressionBuilderWithoutRenaming::create(self.metadata.clone());
let mut pred_expr = eb.build(&scalar)?;
DeleteCollectPushDowns::visit_filter(&mut pred_expr, &mut require_columns)?;
expression = Some(pred_expr);
}

let table = self
.ctx
.get_table(&catalog_name, &database_name, &table_name)
.await?;

let tbl_info = table.get_table_info();
let table_id = tbl_info.ident.clone();
let mut col_indices = vec![];
let schema = tbl_info.meta.schema.as_ref();
for col_name in require_columns {
if let Some((idx, _)) = schema.column_with_name(col_name.as_str()) {
col_indices.push(idx);
} else {
return Err(ErrorCode::UnknownColumn(format!(
"Column [{}] not found",
col_name
)));
}
}
let table_id = tbl_info.ident;

// @todo wait delete migrate to new planner
let col_indices: Vec<usize> = if let Some(expr) = selection {
let (scalar, _) = scalar_binder.bind(expr).await?;
scalar.used_columns().into_iter().collect()
} else {
vec![]
};
let selection = selection.as_ref().map(|expr| format!("({})", expr));
let projection = Projection::Columns(col_indices);

let plan = DeletePlan {
catalog_name,
database_name,
table_name,
table_id,
selection: expression,
selection,
projection,
};
Ok(Plan::Delete(Box::new(plan)))
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/sql/statements/statement_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ impl AnalyzableStatement for DfDeleteStatement {
let selection = if let Some(predicate) = &self.selection {
let mut pred_expr = analyzer.analyze(predicate).await?;
DeleteCollectPushDowns::visit_filter(&mut pred_expr, &mut require_columns)?;
Some(pred_expr)
Some(predicate.to_string())
} else {
None
};

let table_id = tbl_info.ident.clone();
let table_id = tbl_info.ident;
let mut col_indices = vec![];
let schema = tbl_info.meta.schema.as_ref();
for col_name in require_columns {
Expand Down
10 changes: 9 additions & 1 deletion src/query/storages/fuse/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::sync::Arc;
use common_catalog::table::Table;
use common_catalog::table_context::TableContext;
use common_datavalues::DataSchemaRefExt;
use common_exception::ErrorCode;
use common_exception::Result;
use common_fuse_meta::meta::TableSnapshot;
use common_legacy_parser::ExpressionParser;
use common_legacy_planners::DeletePlan;
use common_legacy_planners::Expression;
use common_legacy_planners::Extras;
Expand Down Expand Up @@ -51,7 +53,13 @@ impl FuseTable {

// check if unconditional deletion
if let Some(filter) = &plan.selection {
self.delete_rows(ctx, &snapshot, filter, plan).await
let expr = ExpressionParser::parse_exprs(filter)?;
if expr.is_empty() {
return Err(ErrorCode::IndexOutOfBounds(
"expression should be valid, but not",
));
}
self.delete_rows(ctx, &snapshot, &expr[0], plan).await
} else {
// deleting the whole table... just a truncate
let purge = false;
Expand Down