Skip to content

Commit e807877

Browse files
committed
Add affect rows for delete/insert
1 parent 228f426 commit e807877

File tree

4 files changed

+55
-17
lines changed

4 files changed

+55
-17
lines changed

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,4 +469,9 @@ impl ProgressReporter for ContextProgressReporter {
469469
convert_byte_size((progress.bytes as f64) / (seconds)),
470470
)
471471
}
472+
473+
fn affected_rows(&self) -> u64 {
474+
let progress = self.context.get_scan_progress_value();
475+
progress.rows as u64
476+
}
472477
}

src/query/service/src/servers/mysql/writers/query_result_writer.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use tracing::error;
3838
/// "Read x rows, y MiB in z sec., A million rows/sec., B MiB/sec."
3939
pub trait ProgressReporter {
4040
fn progress_info(&self) -> String;
41+
fn affected_rows(&self) -> u64;
4142
}
4243

4344
pub struct QueryResult {
@@ -122,7 +123,16 @@ impl<'a, W: AsyncWrite + Send + Unpin> DFQueryResultWriter<'a, W> {
122123
}
123124
}
124125

125-
dataset_writer.completed(OkResponse::default()).await?;
126+
let affected_rows = query_result
127+
.extra_info
128+
.map(|r| r.affected_rows())
129+
.unwrap_or_default();
130+
dataset_writer
131+
.completed(OkResponse {
132+
affected_rows,
133+
..Default::default()
134+
})
135+
.await?;
126136
return Ok(());
127137
}
128138

src/query/storages/fuse/fuse/src/operations/delete.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_base::base::ProgressValues;
1718
use common_catalog::plan::Expression;
1819
use common_catalog::plan::Partitions;
1920
use common_catalog::plan::PartitionsShuffleKind;
@@ -74,8 +75,14 @@ impl FuseTable {
7475
return Ok(());
7576
}
7677

78+
let scan_progress = ctx.get_scan_progress();
7779
// check if unconditional deletion
7880
if filter.is_none() {
81+
let progress_values = ProgressValues {
82+
rows: snapshot.summary.row_count as usize,
83+
bytes: snapshot.summary.uncompressed_byte_size as usize,
84+
};
85+
scan_progress.incr(&progress_values);
7986
// deleting the whole table... just a truncate
8087
let purge = false;
8188
return self.do_truncate(ctx.clone(), purge).await;
@@ -91,7 +98,19 @@ impl FuseTable {
9198
// if the `filter_expr` is of "constant" nullary :
9299
// for the whole block, whether all of the rows should be kept or dropped,
93100
// we can just return from here, without accessing the block data
94-
return self.delete_eval_const(ctx.clone(), &filter_expr).await;
101+
if self.try_eval_const(&filter_expr)? {
102+
let progress_values = ProgressValues {
103+
rows: snapshot.summary.row_count as usize,
104+
bytes: snapshot.summary.uncompressed_byte_size as usize,
105+
};
106+
scan_progress.incr(&progress_values);
107+
108+
// deleting the whole table... just a truncate
109+
let purge = false;
110+
return self.do_truncate(ctx.clone(), purge).await;
111+
}
112+
// do nothing.
113+
return Ok(());
95114
}
96115

97116
self.try_add_deletion_source(ctx.clone(), &filter_expr, col_indices, &snapshot, pipeline)
@@ -105,32 +124,21 @@ impl FuseTable {
105124
Ok(())
106125
}
107126

108-
async fn delete_eval_const(
109-
&self,
110-
ctx: Arc<dyn TableContext>,
111-
filter: &Expression,
112-
) -> Result<()> {
127+
fn try_eval_const(&self, filter: &Expression) -> Result<bool> {
113128
let func_ctx = FunctionContext::default();
114129

115130
let dummy_field = DataField::new("dummy", NullType::new_impl());
116-
let dummy_schema = Arc::new(DataSchema::new(vec![dummy_field.clone()]));
131+
let dummy_schema = Arc::new(DataSchema::new(vec![dummy_field]));
117132
let dummy_column = DataValue::Null.as_const_column(&NullType::new_impl(), 1)?;
118133
let dummy_data_block = DataBlock::create(dummy_schema.clone(), vec![dummy_column]);
119134

120135
let eval_node = Arc::new(Evaluator::eval_expression(filter, dummy_schema.as_ref())?);
121136
let filter_result = eval_node.eval(&func_ctx, &dummy_data_block)?.vector;
122137
debug_assert!(filter_result.len() == 1);
123138

124-
let filter_result = DataBlock::cast_to_nonull_boolean(&filter_result)?
139+
DataBlock::cast_to_nonull_boolean(&filter_result)?
125140
.get(0)
126-
.as_bool()?;
127-
if filter_result {
128-
// deleting the whole table... just a truncate
129-
let purge = false;
130-
return self.do_truncate(ctx.clone(), purge).await;
131-
}
132-
// do nothing.
133-
Ok(())
141+
.as_bool()
134142
}
135143

136144
async fn try_add_deletion_source(

src/query/storages/fuse/fuse/src/operations/mutation/mutation_sink.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::any::Any;
1616
use std::sync::Arc;
1717

18+
use common_base::base::Progress;
19+
use common_base::base::ProgressValues;
1820
use common_catalog::table::Table;
1921
use common_catalog::table::TableExt;
2022
use common_catalog::table_context::TableContext;
@@ -63,6 +65,7 @@ pub struct MutationSink {
6365
ctx: Arc<dyn TableContext>,
6466
dal: Operator,
6567
location_gen: TableMetaLocationGenerator,
68+
scan_progress: Arc<Progress>,
6669

6770
table: Arc<dyn Table>,
6871
base_snapshot: Arc<TableSnapshot>,
@@ -84,11 +87,13 @@ impl MutationSink {
8487
base_snapshot: Arc<TableSnapshot>,
8588
input: Arc<InputPort>,
8689
) -> Result<ProcessorPtr> {
90+
let scan_progress = ctx.get_scan_progress();
8791
Ok(ProcessorPtr::create(Box::new(MutationSink {
8892
state: State::None,
8993
ctx,
9094
dal: table.get_operator(),
9195
location_gen: table.meta_location_generator.clone(),
96+
scan_progress,
9297
table: Arc::new(table.clone()),
9398
base_snapshot,
9499
merged_segments: vec![],
@@ -154,6 +159,16 @@ impl Processor for MutationSink {
154159
match std::mem::replace(&mut self.state, State::None) {
155160
State::ReadMeta(input_meta) => {
156161
let meta = MutationMeta::from_meta(&input_meta)?;
162+
163+
let affect_rows = self.base_snapshot.summary.row_count - meta.summary.row_count;
164+
let affect_bytes = self.base_snapshot.summary.uncompressed_byte_size
165+
- meta.summary.uncompressed_byte_size;
166+
let progress_values = ProgressValues {
167+
rows: affect_rows as usize,
168+
bytes: affect_bytes as usize,
169+
};
170+
self.scan_progress.incr(&progress_values);
171+
157172
self.merged_segments = meta.segments.clone();
158173
self.merged_statistics = meta.summary.clone();
159174
self.abort_operation = meta.abort_operation.clone();

0 commit comments

Comments
 (0)