Skip to content

Commit 7c30c8d

Browse files
committed
fix merge
1 parent 43f1374 commit 7c30c8d

File tree

2 files changed

+15
-10
lines changed

2 files changed

+15
-10
lines changed

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::optimizer::RuleID;
4646
use crate::optimizer::SExpr;
4747
use crate::optimizer::DEFAULT_REWRITE_RULES;
4848
use crate::planner::query_executor::QueryExecutor;
49+
use crate::plans::Append;
4950
use crate::plans::CopyIntoLocationPlan;
5051
use crate::plans::Exchange;
5152
use crate::plans::Join;
@@ -294,11 +295,20 @@ pub async fn optimize(mut opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan>
294295
overwrite,
295296
forbid_occ_retry,
296297
} => {
298+
let support_distributed_insert = {
299+
let append: Append = s_expr.plan().clone().try_into()?;
300+
let metadata = metadata.read();
301+
metadata
302+
.table(append.table_index)
303+
.table()
304+
.support_distributed_insert()
305+
};
297306
let enable_distributed = opt_ctx.enable_distributed_optimization
298307
&& opt_ctx
299308
.table_ctx
300309
.get_settings()
301-
.get_enable_distributed_copy()?;
310+
.get_enable_distributed_copy()?
311+
&& support_distributed_insert;
302312
info!(
303313
"after optimization enable_distributed_copy? : {}",
304314
enable_distributed

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -137,23 +137,18 @@ where F: SnapshotGenerator + Send + 'static
137137
}
138138

139139
fn is_error_recoverable(&self, e: &ErrorCode) -> bool {
140-
let code = e.code();
141-
// When prev_snapshot_id is some, means it is an alter table column modification or truncate.
142-
// In this case if commit to meta fail and error is TABLE_VERSION_MISMATCHED operation will be aborted.
143-
if self.prev_snapshot_id.is_some() && code == ErrorCode::TABLE_VERSION_MISMATCHED {
140+
if self.forbid_retry {
144141
return false;
145142
}
146143

147-
code == ErrorCode::TABLE_VERSION_MISMATCHED
148-
|| (self.purge && code == ErrorCode::STORAGE_NOT_FOUND)
144+
e.code() == ErrorCode::TABLE_VERSION_MISMATCHED
145+
|| (self.purge && e.code() == ErrorCode::STORAGE_NOT_FOUND)
149146
}
150147

151148
fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool {
152149
// currently, the only error that we know, which indicates there are no side effects
153150
// is TABLE_VERSION_MISMATCHED
154151
e.code() == ErrorCode::TABLE_VERSION_MISMATCHED
155-
fn can_retry(&self, e: &ErrorCode) -> bool {
156-
!self.forbid_retry && FuseTable::is_error_recoverable(e, self.purge)
157152
}
158153

159154
fn read_meta(&mut self) -> Result<Event> {
@@ -448,7 +443,7 @@ where F: SnapshotGenerator + Send + 'static
448443
info!("commit mutation success, targets {:?}", target_descriptions);
449444
self.state = State::Finish;
450445
}
451-
Err(e) if self.can_retry(&e) => {
446+
Err(e) if self.is_error_recoverable(&e) => {
452447
let table_info = self.table.get_table_info();
453448
match self.backoff.next_backoff() {
454449
Some(d) => {

0 commit comments

Comments
 (0)