Skip to content

Commit 27bf797

Browse files
authored
chore: remove spawn_blocking for merge into and replace into (#14664)
remove spawn_blocking for merge into and replace into
1 parent 98167cb commit 27bf797

File tree

3 files changed

+41
-10
lines changed

3 files changed

+41
-10
lines changed

src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,10 @@ impl MatchedAggregator {
337337
let log_entries = futures::future::try_join_all(mutation_log_handlers)
338338
.await
339339
.map_err(|e| {
340-
ErrorCode::Internal("unexpected, failed to join apply-deletion tasks.")
341-
.add_message_back(e.to_string())
340+
ErrorCode::Internal(
341+
"unexpected, failed to join apply update and delete tasks for merge into.",
342+
)
343+
.add_message_back(e.to_string())
342344
})?;
343345
let mut mutation_logs = Vec::new();
344346
for maybe_log_entry in log_entries {
@@ -377,6 +379,7 @@ impl AggregationContext {
377379
&self.block_reader,
378380
block_meta,
379381
&self.read_settings,
382+
self.ctx.get_id(),
380383
)
381384
.await?;
382385
let origin_num_rows = origin_data_block.num_rows();
@@ -407,8 +410,9 @@ impl AggregationContext {
407410
// and wait (asyncly, which will NOT block the executor thread)
408411
let block_builder = self.block_builder.clone();
409412
let origin_stats = block_meta.cluster_stats.clone();
413+
410414
let serialized = GlobalIORuntime::instance()
411-
.spawn_blocking(move || {
415+
.spawn(self.ctx.get_id(), async move {
412416
block_builder.build(res_block, |block, generator| {
413417
let cluster_stats =
414418
generator.gen_with_origin_stats(&block, origin_stats.clone())?;
@@ -419,7 +423,13 @@ impl AggregationContext {
419423
Ok((cluster_stats, block))
420424
})
421425
})
422-
.await?;
426+
.await
427+
.map_err(|e| {
428+
ErrorCode::Internal(
429+
"unexpected, failed to serialize block when apply update and delete to data block for merge into",
430+
)
431+
.add_message_back(e.to_string())
432+
})??;
423433

424434
// persistent data
425435
let new_block_meta = serialized.block_meta;

src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ struct AggregationContext {
8989
segment_reader: CompactSegmentInfoReader,
9090
block_builder: BlockBuilder,
9191
io_request_semaphore: Arc<Semaphore>,
92+
query_id: String,
9293
}
9394

9495
// Apply MergeIntoOperations to segments
@@ -166,7 +167,7 @@ impl MergeIntoOperationAggregator {
166167
Some(reader)
167168
}
168169
};
169-
170+
let query_id = ctx.get_id();
170171
Ok(Self {
171172
ctx,
172173
deletion_accumulator,
@@ -184,6 +185,7 @@ impl MergeIntoOperationAggregator {
184185
segment_reader,
185186
block_builder,
186187
io_request_semaphore,
188+
query_id,
187189
}),
188190
})
189191
}
@@ -392,6 +394,7 @@ impl AggregationContext {
392394
&self.key_column_reader,
393395
block_meta,
394396
&self.read_settings,
397+
self.query_id.clone(),
395398
)
396399
.await?;
397400

@@ -507,14 +510,20 @@ impl AggregationContext {
507510
let block_builder = self.block_builder.clone();
508511
let origin_stats = block_meta.cluster_stats.clone();
509512
let serialized = GlobalIORuntime::instance()
510-
.spawn_blocking(move || {
513+
.spawn(self.query_id.clone(), async move {
511514
block_builder.build(new_block, |block, generator| {
512515
let cluster_stats =
513516
generator.gen_with_origin_stats(&block, origin_stats.clone())?;
514517
Ok((cluster_stats, block))
515518
})
516519
})
517-
.await?;
520+
.await
521+
.map_err(|e| {
522+
ErrorCode::Internal(
523+
"unexpected, failed to join apply delete tasks for replace into.",
524+
)
525+
.add_message_back(e.to_string())
526+
})??;
518527

519528
// persistent data
520529
let new_block_meta = serialized.block_meta;
@@ -600,7 +609,7 @@ impl AggregationContext {
600609
let block_meta_ptr = block_meta.clone();
601610
let reader = reader.clone();
602611
GlobalIORuntime::instance()
603-
.spawn_blocking(move || {
612+
.spawn(self.query_id.clone(), async move {
604613
let column_chunks = merged_io_read_result.columns_chunks()?;
605614
reader.deserialize_chunks(
606615
block_meta_ptr.location.0.as_str(),
@@ -612,6 +621,12 @@ impl AggregationContext {
612621
)
613622
})
614623
.await
624+
.map_err(|e| {
625+
ErrorCode::Internal(
626+
"unexpected, failed to join aggregation context read block tasks for replace into.",
627+
)
628+
.add_message_back(e.to_string())
629+
})?
615630
}
616631

617632
// return true if the block is pruned, otherwise false

src/query/storages/fuse/src/operations/util.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use databend_common_arrow::parquet::metadata::ThriftFileMetaData;
2222
use databend_common_base::base::tokio::sync::OwnedSemaphorePermit;
2323
use databend_common_base::base::tokio::sync::Semaphore;
2424
use databend_common_base::runtime::GlobalIORuntime;
25+
use databend_common_base::runtime::TrySpawn;
2526
use databend_common_exception::ErrorCode;
2627
use databend_common_exception::Result;
2728
use databend_common_expression::ColumnId;
@@ -199,6 +200,7 @@ pub async fn read_block(
199200
reader: &BlockReader,
200201
block_meta: &BlockMeta,
201202
read_settings: &ReadSettings,
203+
query_id: String,
202204
) -> Result<DataBlock> {
203205
let merged_io_read_result = reader
204206
.read_columns_data_by_merge_io(
@@ -211,11 +213,11 @@ pub async fn read_block(
211213

212214
// deserialize block data
213215
// cpu intensive task, send them to dedicated thread pool
214-
215216
let block_meta_ptr = block_meta.clone();
216217
let reader = reader.clone();
218+
217219
GlobalIORuntime::instance()
218-
.spawn_blocking(move || {
220+
.spawn(query_id, async move {
219221
let column_chunks = merged_io_read_result.columns_chunks()?;
220222
reader.deserialize_chunks(
221223
block_meta_ptr.location.0.as_str(),
@@ -227,4 +229,8 @@ pub async fn read_block(
227229
)
228230
})
229231
.await
232+
.map_err(|e| {
233+
ErrorCode::Internal("unexpected, failed to read block for merge into")
234+
.add_message_back(e.to_string())
235+
})?
230236
}

0 commit comments

Comments
 (0)