Skip to content

Commit 8a7d265

Browse files
committed
Pass append logs by precommit blocks.
1 parent 805b93e commit 8a7d265

File tree

4 files changed

+6
-40
lines changed

4 files changed

+6
-40
lines changed

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use chrono::TimeZone;
1919
use chrono::Utc;
2020
use common_base::base::GlobalIORuntime;
2121
use common_base::base::TrySpawn;
22-
use common_datablocks::DataBlock;
2322
use common_datavalues::DataSchemaRef;
2423
use common_exception::ErrorCode;
2524
use common_exception::Result;
@@ -86,12 +85,12 @@ pub fn append2table(
8685
executor.execute()
8786
}
8887

89-
pub async fn commit2table_with_append_entries(
88+
pub async fn commit2table(
9089
ctx: Arc<QueryContext>,
9190
table: Arc<dyn Table>,
9291
overwrite: bool,
93-
append_entries: Vec<DataBlock>,
9492
) -> Result<()> {
93+
let append_entries = ctx.consume_precommit_blocks();
9594
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
9695
let catalog_name = ctx.get_current_catalog();
9796
let handler = GlobalIORuntime::instance().spawn(async move {
@@ -110,15 +109,6 @@ pub async fn commit2table_with_append_entries(
110109
}
111110
}
112111

113-
pub async fn commit2table(
114-
ctx: Arc<QueryContext>,
115-
table: Arc<dyn Table>,
116-
overwrite: bool,
117-
) -> Result<()> {
118-
let append_entries = ctx.consume_precommit_blocks();
119-
commit2table_with_append_entries(ctx, table, overwrite, append_entries).await
120-
}
121-
122112
pub async fn validate_grant_object_exists(
123113
ctx: &Arc<QueryContext>,
124114
object: &GrantObject,

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use futures_util::StreamExt;
2727
use parking_lot::Mutex;
2828

2929
use super::commit2table;
30-
use super::commit2table_with_append_entries;
3130
use super::interpreter_common::append2table;
3231
use super::plan_schedulers::build_schedule_pipepline;
3332
use super::ProcessorExecutorStream;
@@ -249,20 +248,13 @@ impl InsertInterpreterV2 {
249248
executor_settings,
250249
)?;
251250

252-
let mut append_entries = vec![];
253251
let mut stream: SendableDataBlockStream =
254252
Box::pin(ProcessorExecutorStream::create(executor)?);
255253
while let Some(block) = stream.next().await {
256-
append_entries.push(block?);
254+
block?;
257255
}
258256

259-
commit2table_with_append_entries(
260-
self.ctx.clone(),
261-
table,
262-
self.plan.overwrite,
263-
append_entries,
264-
)
265-
.await?;
257+
commit2table(self.ctx.clone(), table.clone(), self.plan.overwrite).await?;
266258

267259
Ok(Box::pin(DataBlockStream::create(
268260
self.plan.schema(),

src/query/service/src/interpreters/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
9999
pub use interpreter_clustering_history::InterpreterClusteringHistory;
100100
pub use interpreter_common::append2table;
101101
pub use interpreter_common::commit2table;
102-
pub use interpreter_common::commit2table_with_append_entries;
103102
pub use interpreter_common::fill_missing_columns;
104103
pub use interpreter_common::list_files_from_dal;
105104
pub use interpreter_database_create::CreateDatabaseInterpreter;

src/query/storages/fuse/src/operations/fuse_sink.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ enum State {
6464
location: String,
6565
segment: Arc<SegmentInfo>,
6666
},
67-
Output(DataBlock),
6867
Finished,
6968
}
7069

@@ -78,8 +77,7 @@ pub struct FuseTableSink {
7877
accumulator: StatisticsAccumulator,
7978
cluster_stats_gen: ClusterStatsGenerator,
8079

81-
// For distributed insert select.
82-
// We should output the append logs to the exchange sink.
80+
// A dummy output port for distributed insert select to connect Exchange Sink.
8381
output: Option<Arc<OutputPort>>,
8482
}
8583

@@ -132,15 +130,6 @@ impl Processor for FuseTableSink {
132130
return Ok(Event::Async);
133131
}
134132

135-
if let State::Output(block) = std::mem::replace(&mut self.state, State::Finished) {
136-
if let Some(output) = &self.output {
137-
output.push_data(Ok(block));
138-
return Ok(Event::NeedConsume);
139-
} else {
140-
return Err(ErrorCode::LogicalError("No output port"));
141-
}
142-
}
143-
144133
if self.input.is_finished() {
145134
if self.accumulator.summary_row_count != 0 {
146135
self.state = State::GenerateSegment;
@@ -280,11 +269,7 @@ impl Processor for FuseTableSink {
280269
// TODO: dyn operation for table trait
281270
let log_entry = AppendOperationLogEntry::new(location, segment);
282271
let data_block = DataBlock::try_from(log_entry)?;
283-
if self.output.is_some() {
284-
self.state = State::Output(data_block)
285-
} else {
286-
self.ctx.push_precommit_block(data_block);
287-
}
272+
self.ctx.push_precommit_block(data_block);
288273
}
289274
_state => {
290275
return Err(ErrorCode::LogicalError(

0 commit comments

Comments
 (0)