Skip to content

Commit e00d3de

Browse files
authored
Merge pull request #9128 from zhyass/feature_update
fix: too many io requests for read blocks during compact
2 parents 5b67160 + 3b804e5 commit e00d3de

File tree

3 files changed

+46
-22
lines changed

3 files changed

+46
-22
lines changed

src/query/storages/fuse/fuse/src/io/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub use read::MetaReaders;
2828
pub use read::SegmentInfoReader;
2929
pub use read::SnapshotHistoryReader;
3030
pub use read::TableSnapshotReader;
31+
pub use segments::try_join_futures;
3132
pub use segments::SegmentsIO;
3233
pub use snapshots::SnapshotsIO;
3334
pub use write::write_block;

src/query/storages/fuse/fuse/src/io/segments.rs

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::future::Future;
1516
use std::sync::Arc;
1617

1718
use common_base::base::tokio::sync::Semaphore;
@@ -56,11 +57,7 @@ impl SegmentsIO {
5657
return Ok(vec![]);
5758
}
5859

59-
let ctx = self.ctx.clone();
60-
let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize;
61-
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
62-
63-
// 1.1 combine all the tasks.
60+
// combine all the tasks.
6461
let mut iter = segment_locations.iter();
6562
let tasks = std::iter::from_fn(move || {
6663
if let Some(location) = iter.next() {
@@ -74,22 +71,42 @@ impl SegmentsIO {
7471
}
7572
});
7673

77-
// 1.2 build the runtime.
78-
let semaphore = Semaphore::new(max_io_requests);
79-
let segments_runtime = Arc::new(Runtime::with_worker_threads(
80-
max_runtime_threads,
81-
Some("fuse-req-segments-worker".to_owned()),
82-
)?);
74+
try_join_futures(
75+
self.ctx.clone(),
76+
tasks,
77+
"fuse-req-segments-worker".to_owned(),
78+
)
79+
.await
80+
}
81+
}
8382

84-
// 1.3 spawn all the tasks to the runtime.
85-
let join_handlers = segments_runtime.try_spawn_batch(semaphore, tasks).await?;
83+
pub async fn try_join_futures<Fut>(
84+
ctx: Arc<dyn TableContext>,
85+
futures: impl IntoIterator<Item = Fut>,
86+
thread_name: String,
87+
) -> Result<Vec<Fut::Output>>
88+
where
89+
Fut: Future + Send + 'static,
90+
Fut::Output: Send + 'static,
91+
{
92+
let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize;
93+
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
8694

87-
// 1.4 get all the result.
88-
let joint = future::try_join_all(join_handlers)
89-
.instrument(tracing::debug_span!("read_segments_join_all"))
90-
.await
91-
.map_err(|e| ErrorCode::StorageOther(format!("read segments failure, {}", e)))?;
95+
// 1. build the runtime.
96+
let semaphore = Semaphore::new(max_io_requests);
97+
let segments_runtime = Arc::new(Runtime::with_worker_threads(
98+
max_runtime_threads,
99+
Some(thread_name),
100+
)?);
92101

93-
Ok(joint)
94-
}
102+
// 2. spawn all the tasks to the runtime.
103+
let join_handlers = segments_runtime.try_spawn_batch(semaphore, futures).await?;
104+
105+
// 3. get all the result.
106+
let joint = future::try_join_all(join_handlers)
107+
.instrument(tracing::debug_span!("try_join_futures_all"))
108+
.await
109+
.map_err(|e| ErrorCode::StorageOther(format!("try join futures failure, {}", e)))?;
110+
111+
Ok(joint)
95112
}

src/query/storages/fuse/fuse/src/operations/mutation/compact/compact_transform.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub struct CompactTransform {
9292

9393
// Limit the memory size of the block read.
9494
max_memory: u64,
95+
max_io_requests: usize,
9596
compact_tasks: VecDeque<CompactTask>,
9697
block_metas: Vec<Arc<BlockMeta>>,
9798
order: usize,
@@ -112,9 +113,10 @@ impl CompactTransform {
112113
thresholds: BlockCompactThresholds,
113114
) -> Result<ProcessorPtr> {
114115
let settings = ctx.get_settings();
115-
let max_memory_usage = (settings.get_max_memory_usage()? as f64 * 0.95) as u64;
116+
let max_memory_usage = (settings.get_max_memory_usage()? as f64 * 0.8) as u64;
116117
let max_threads = settings.get_max_threads()?;
117118
let max_memory = max_memory_usage / max_threads;
119+
let max_io_requests = settings.get_max_storage_io_requests()? as usize;
118120
Ok(ProcessorPtr::create(Box::new(CompactTransform {
119121
state: State::Consume,
120122
input,
@@ -125,6 +127,7 @@ impl CompactTransform {
125127
location_gen,
126128
dal,
127129
max_memory,
130+
max_io_requests,
128131
compact_tasks: VecDeque::new(),
129132
block_metas: Vec::new(),
130133
order: 0,
@@ -326,7 +329,10 @@ impl Processor for CompactTransform {
326329
acc + memory
327330
});
328331

329-
if memory_usage > self.max_memory {
332+
if (memory_usage > self.max_memory
333+
|| task_futures.len() + metas.len() > self.max_io_requests)
334+
&& !task_futures.is_empty()
335+
{
330336
self.compact_tasks.push_front(task);
331337
break;
332338
}

0 commit comments

Comments
 (0)