Skip to content

Commit 3b38f5d

Browse files
zhyassdantengsky
andauthored
chore(storage): allow append when change tracking was enabled (#15427)
* allow append when change tracking was enabled * chore: more code comments & minor refactoring --------- Co-authored-by: dantengsky <dantengsky@gmail.com>
1 parent 42f2477 commit 3b38f5d

File tree

3 files changed

+72
-47
lines changed

3 files changed

+72
-47
lines changed

src/query/storages/fuse/src/operations/common/processors/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
// limitations under the License.
1414

1515
mod multi_table_insert_commit;
16-
pub use multi_table_insert_commit::CommitMultiTableInsert;
1716
mod sink_commit;
1817
mod transform_merge_commit_meta;
1918
mod transform_mutation_aggregator;
2019
mod transform_serialize_block;
2120
mod transform_serialize_segment;
2221

22+
pub use multi_table_insert_commit::CommitMultiTableInsert;
2323
pub use sink_commit::CommitSink;
2424
pub use transform_merge_commit_meta::TransformMergeCommitMeta;
2525
pub use transform_mutation_aggregator::TableMutationAggregator;

src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@ use databend_common_pipeline_sinks::AsyncSink;
3232
use databend_storages_common_table_meta::meta::TableSnapshot;
3333
use databend_storages_common_table_meta::meta::Versioned;
3434

35-
use super::TransformMergeCommitMeta;
36-
use crate::operations::common::CommitMeta;
3735
use crate::operations::AppendGenerator;
36+
use crate::operations::CommitMeta;
3837
use crate::operations::SnapshotGenerator;
38+
use crate::operations::TransformMergeCommitMeta;
3939
use crate::FuseTable;
40+
4041
pub struct CommitMultiTableInsert {
4142
commit_metas: HashMap<u64, CommitMeta>,
4243
tables: HashMap<u64, Arc<dyn Table>>,

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 68 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ use log::warn;
4848
use opendal::Operator;
4949

5050
use crate::io::TableMetaLocationGenerator;
51-
use crate::operations::common::AbortOperation;
52-
use crate::operations::common::CommitMeta;
53-
use crate::operations::common::SnapshotGenerator;
5451
use crate::operations::set_backoff;
52+
use crate::operations::AbortOperation;
53+
use crate::operations::AppendGenerator;
54+
use crate::operations::CommitMeta;
55+
use crate::operations::SnapshotGenerator;
5556
use crate::operations::TruncateGenerator;
5657
use crate::operations::TruncateMode;
5758
use crate::FuseTable;
@@ -71,7 +72,7 @@ enum State {
7172
snapshot: TableSnapshot,
7273
table_info: TableInfo,
7374
},
74-
AbortOperation,
75+
AbortOperation(ErrorCode),
7576
Finish,
7677
}
7778

@@ -205,6 +206,13 @@ where F: SnapshotGenerator + Send + 'static
205206
.downcast_ref::<TruncateGenerator>()
206207
.is_some_and(|gen| !matches!(gen.mode(), TruncateMode::Delete))
207208
}
209+
210+
fn is_append_only_txn(&self) -> bool {
211+
self.snapshot_gen
212+
.as_any()
213+
.downcast_ref::<AppendGenerator>()
214+
.is_some()
215+
}
208216
}
209217

210218
#[async_trait::async_trait]
@@ -229,7 +237,7 @@ where F: SnapshotGenerator + Send + 'static
229237
State::FillDefault
230238
| State::TryCommit { .. }
231239
| State::RefreshTable
232-
| State::AbortOperation
240+
| State::AbortOperation(_)
233241
) {
234242
return Ok(Event::Async);
235243
}
@@ -259,33 +267,50 @@ where F: SnapshotGenerator + Send + 'static
259267
cluster_key_meta,
260268
table_info,
261269
} => {
262-
if !self.change_tracking && self.table.change_tracking_enabled() {
263-
// If change tracing is disabled when the txn start, but is enabled when commit,
270+
let change_tracking_enabled_during_commit = {
271+
let no_change_tracking_at_beginning = !self.change_tracking;
272+
// note that `self.table` might be refreshed if commit retried
273+
let change_tracking_enabled_now = self.table.change_tracking_enabled();
274+
275+
no_change_tracking_at_beginning && change_tracking_enabled_now
276+
};
277+
278+
if !self.is_append_only_txn() && change_tracking_enabled_during_commit {
279+
// If change tracking is not enabled when the txn start, but is enabled when committing,
264280
// then the txn should be aborted.
265-
error!("commit mutation failed cause change tracking is enabled when commit");
266-
self.state = State::AbortOperation;
267-
} else {
268-
let schema = self.table.schema().as_ref().clone();
269-
match self.snapshot_gen.generate_new_snapshot(
270-
schema,
271-
cluster_key_meta,
272-
previous,
273-
Some(table_info.ident.seq),
274-
) {
275-
Ok(snapshot) => {
276-
self.state = State::TryCommit {
277-
data: snapshot.to_bytes()?,
278-
snapshot,
279-
table_info,
280-
};
281-
}
282-
Err(e) => {
283-
error!(
284-
"commit mutation failed after {} retries, error: {:?}",
285-
self.retries, e,
286-
);
287-
self.state = State::AbortOperation;
288-
}
281+
// For mutations other than append-only, stream column values (like _origin_block_id)
282+
// must be properly generated. If not, CDC will not function as expected.
283+
self.state = State::AbortOperation(ErrorCode::StorageOther(
284+
"commit failed because change tracking was enabled during the commit process",
285+
));
286+
return Ok(());
287+
}
288+
289+
// now:
290+
// - either current txn IS append-only
291+
// even if this is a conflict txn T (in the meaning of table version) has been
292+
// commited, which has changed the change-tracking state from disabled to enabled,
293+
// merging with transaction T is still safe, since the CDC mechanism allows it.
294+
// - or change-tracking state is NOT changed.
295+
// in this case, we only need standard conflict resolution.
296+
// therefore, we can safely proceed.
297+
298+
let schema = self.table.schema().as_ref().clone();
299+
match self.snapshot_gen.generate_new_snapshot(
300+
schema,
301+
cluster_key_meta,
302+
previous,
303+
Some(table_info.ident.seq),
304+
) {
305+
Ok(snapshot) => {
306+
self.state = State::TryCommit {
307+
data: snapshot.to_bytes()?,
308+
snapshot,
309+
table_info,
310+
};
311+
}
312+
Err(e) => {
313+
self.state = State::AbortOperation(e);
289314
}
290315
}
291316
}
@@ -312,9 +337,10 @@ where F: SnapshotGenerator + Send + 'static
312337
.map_or(true, |previous| previous.snapshot_id != prev_snapshot_id)
313338
});
314339
if snapshot_has_changed {
315-
error!("commit mutation failed cause snapshot has changed when commit");
316340
// if snapshot has changed abort operation
317-
self.state = State::AbortOperation;
341+
self.state = State::AbortOperation(ErrorCode::StorageOther(
342+
"commit failed because the snapshot had changed during the commit process",
343+
));
318344
} else {
319345
self.snapshot_gen
320346
.fill_default_values(schema, &previous)
@@ -333,11 +359,7 @@ where F: SnapshotGenerator + Send + 'static
333359
self.state = State::FillDefault;
334360
}
335361
Err(e) => {
336-
error!(
337-
"commit mutation failed cause get lock failed, error: {:?}",
338-
e
339-
);
340-
self.state = State::AbortOperation;
362+
self.state = State::AbortOperation(e);
341363
}
342364
},
343365
State::TryCommit {
@@ -448,7 +470,7 @@ where F: SnapshotGenerator + Send + 'static
448470
if FuseTable::no_side_effects_in_meta_store(&e) {
449471
// if we are sure that table state inside metastore has not been
450472
// modified by this operation, abort this operation.
451-
self.state = State::AbortOperation;
473+
self.state = State::AbortOperation(e);
452474
} else {
453475
return Err(ErrorCode::OCCRetryFailure(format!(
454476
"can not fulfill the tx after retries({} times, {} ms), aborted. table name {}, identity {}",
@@ -481,18 +503,20 @@ where F: SnapshotGenerator + Send + 'static
481503
table_info: fuse_table.table_info.clone(),
482504
};
483505
}
484-
State::AbortOperation => {
506+
State::AbortOperation(e) => {
485507
let duration = self.start_time.elapsed();
486508
metrics_inc_commit_aborts();
487509
// todo: use histogram when it ready
488510
metrics_inc_commit_milliseconds(duration.as_millis());
489511
let op = self.abort_operation.clone();
490512
op.abort(self.ctx.clone(), self.dal.clone()).await?;
491-
return Err(ErrorCode::StorageOther(format!(
492-
"transaction aborted after {} retries, which took {} ms",
513+
error!(
514+
"transaction aborted after {} retries, which took {} ms, cause: {:?}",
493515
self.retries,
494-
duration.as_millis()
495-
)));
516+
duration.as_millis(),
517+
e
518+
);
519+
return Err(e);
496520
}
497521
_ => return Err(ErrorCode::Internal("It's a bug.")),
498522
}

0 commit comments

Comments
 (0)