Skip to content

Commit c1b09ab

Browse files
authored
chore(storage): remove abort operation (#15557)
* chore: remove abort operation * fix
1 parent 9d97ba7 commit c1b09ab

File tree

15 files changed

+61
-208
lines changed

15 files changed

+61
-208
lines changed

src/query/service/src/pipelines/builders/builder_merge_into.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ impl PipelineBuilder {
272272

273273
// 6. serialize segment
274274
let serialize_segment_transform = TransformSerializeSegment::new(
275-
self.ctx.clone(),
276275
InputPort::create(),
277276
OutputPort::create(),
278277
table,
@@ -400,7 +399,6 @@ impl PipelineBuilder {
400399
table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds, None)?;
401400

402401
let serialize_segment_transform = TransformSerializeSegment::new(
403-
self.ctx.clone(),
404402
InputPort::create(),
405403
OutputPort::create(),
406404
table,

src/query/service/src/pipelines/builders/builder_replace_into.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ impl PipelineBuilder {
134134
block_builder.source_schema = table.schema_with_stream();
135135

136136
let serialize_segment_transform = TransformSerializeSegment::new(
137-
self.ctx.clone(),
138137
InputPort::create(),
139138
OutputPort::create(),
140139
table,

src/query/service/src/pipelines/builders/transform_builder.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,9 @@ impl PipelineBuilder {
126126
table: Arc<dyn Table>,
127127
block_thresholds: BlockThresholds,
128128
) -> Result<impl Fn(Arc<InputPort>, Arc<OutputPort>) -> Result<ProcessorPtr>> {
129-
let ctx = self.ctx.clone();
130129
Ok(move |input, output| {
131130
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
132-
let proc = TransformSerializeSegment::new(
133-
ctx.clone(),
134-
input,
135-
output,
136-
fuse_table,
137-
block_thresholds,
138-
);
131+
let proc = TransformSerializeSegment::new(input, output, fuse_table, block_thresholds);
139132
proc.into_processor()
140133
})
141134
}

src/query/storages/fuse/src/operations/commit.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ use opendal::Operator;
5656
use crate::io::MetaWriter;
5757
use crate::io::SegmentsIO;
5858
use crate::io::TableMetaLocationGenerator;
59-
use crate::operations::common::AbortOperation;
6059
use crate::operations::common::AppendGenerator;
6160
use crate::operations::common::CommitSink;
6261
use crate::operations::common::ConflictResolveContext;
@@ -83,8 +82,7 @@ impl FuseTable {
8382
pipeline.try_resize(1)?;
8483

8584
pipeline.add_transform(|input, output| {
86-
let proc =
87-
TransformSerializeSegment::new(ctx.clone(), input, output, self, block_thresholds);
85+
let proc = TransformSerializeSegment::new(input, output, self, block_thresholds);
8886
proc.into_processor()
8987
})?;
9088

@@ -281,7 +279,6 @@ impl FuseTable {
281279
base_snapshot: Arc<TableSnapshot>,
282280
base_segments: &[Location],
283281
base_summary: Statistics,
284-
abort_operation: AbortOperation,
285282
max_retry_elapsed: Option<Duration>,
286283
) -> Result<()> {
287284
let mut retries = 0;
@@ -369,9 +366,6 @@ impl FuseTable {
369366
concurrently_appended_segment_locations =
370367
&latest_snapshot.segments[range_of_newly_append];
371368
} else {
372-
abort_operation
373-
.abort(ctx.clone(), self.operator.clone())
374-
.await?;
375369
metrics_inc_commit_mutation_unresolvable_conflict();
376370
break Err(ErrorCode::UnresolvableConflict(
377371
"segment compact conflict with other operations",
@@ -383,13 +377,10 @@ impl FuseTable {
383377
continue;
384378
}
385379
None => {
386-
// Commit not fulfilled. try to abort the operations.
380+
// Commit not fulfilled, abort.
387381
//
388382
// Note that, here the last error we have seen is TableVersionMismatched,
389383
// otherwise we should have been returned, thus it is safe to abort the operation here.
390-
abort_operation
391-
.abort(ctx.clone(), self.operator.clone())
392-
.await?;
393384
break Err(ErrorCode::StorageOther(format!(
394385
"commit mutation failed after {} retries",
395386
retries

src/query/storages/fuse/src/operations/common/meta/abort_operation.rs

Lines changed: 0 additions & 68 deletions
This file was deleted.

src/query/storages/fuse/src/operations/common/meta/commit_meta.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ use databend_common_exception::ErrorCode;
1616
use databend_common_expression::BlockMetaInfo;
1717
use databend_common_expression::BlockMetaInfoDowncast;
1818
use databend_common_expression::DataBlock;
19+
use databend_storages_common_table_meta::meta::Location;
1920

20-
use crate::operations::common::AbortOperation;
2121
use crate::operations::common::ConflictResolveContext;
2222
use crate::operations::common::SnapshotChanges;
2323

2424
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
2525
pub struct CommitMeta {
2626
pub conflict_resolve_context: ConflictResolveContext,
27-
pub abort_operation: AbortOperation,
27+
pub new_segment_locs: Vec<Location>,
2828
pub table_id: u64,
2929
}
3030

@@ -34,19 +34,19 @@ impl CommitMeta {
3434
conflict_resolve_context: ConflictResolveContext::ModifiedSegmentExistsInLatest(
3535
SnapshotChanges::default(),
3636
),
37-
abort_operation: AbortOperation::default(),
37+
new_segment_locs: vec![],
3838
table_id,
3939
}
4040
}
4141

4242
pub fn new(
4343
conflict_resolve_context: ConflictResolveContext,
44-
abort_operation: AbortOperation,
44+
new_segment_locs: Vec<Location>,
4545
table_id: u64,
4646
) -> Self {
4747
CommitMeta {
4848
conflict_resolve_context,
49-
abort_operation,
49+
new_segment_locs,
5050
table_id,
5151
}
5252
}

src/query/storages/fuse/src/operations/common/meta/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod abort_operation;
1615
mod commit_meta;
1716
mod mutation_log;
1817

19-
pub use abort_operation::AbortOperation;
2018
pub use commit_meta::*;
2119
pub use mutation_log::*;

src/query/storages/fuse/src/operations/common/meta/mutation_log.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use databend_storages_common_table_meta::meta::BlockMeta;
2222
use databend_storages_common_table_meta::meta::FormatVersion;
2323
use databend_storages_common_table_meta::meta::Statistics;
2424

25-
use crate::operations::common::AbortOperation;
2625
use crate::operations::mutation::BlockIndex;
2726
use crate::operations::mutation::CompactExtraInfo;
2827
use crate::operations::mutation::DeletedSegmentInfo;
@@ -38,7 +37,6 @@ pub enum MutationLogEntry {
3837
AppendSegment {
3938
segment_location: String,
4039
format_version: FormatVersion,
41-
abort_operation: AbortOperation,
4240
summary: Statistics,
4341
},
4442
DeletedBlock {

0 commit comments

Comments
 (0)