Skip to content

Commit aea0197

Browse files
committed
Add max_io_requests
1 parent 9d3393f commit aea0197

File tree

1 file changed

+88
-100
lines changed

1 file changed

+88
-100
lines changed

src/query/storages/fuse/fuse/src/operations/mutation/compact/compact_transform.rs

Lines changed: 88 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use opendal::Operator;
3737
use super::compact_meta::CompactSourceMeta;
3838
use super::compact_part::CompactTask;
3939
use super::CompactSinkMeta;
40-
use crate::io::try_join_futures;
4140
use crate::io::write_data;
4241
use crate::io::BlockReader;
4342
use crate::io::TableMetaLocationGenerator;
@@ -51,12 +50,6 @@ use crate::pipelines::processors::Processor;
5150
use crate::statistics::reduce_block_statistics;
5251
use crate::statistics::reducers::reduce_block_metas;
5352

54-
struct CompactState {
55-
blocks: Vec<DataBlock>,
56-
stats_of_columns: Vec<Vec<StatisticsOfColumns>>,
57-
trivals: VecDeque<Arc<BlockMeta>>,
58-
}
59-
6053
struct SerializeState {
6154
block_data: Vec<u8>,
6255
block_location: String,
@@ -67,7 +60,11 @@ struct SerializeState {
6760
enum State {
6861
Consume,
6962
ReadBlocks,
70-
CompactBlocks(CompactState),
63+
CompactBlocks {
64+
blocks: Vec<DataBlock>,
65+
stats_of_columns: Vec<Vec<StatisticsOfColumns>>,
66+
trivals: VecDeque<Arc<BlockMeta>>,
67+
},
7168
SerializedBlocks(Vec<SerializeState>),
7269
GenerateSegment,
7370
SerializedSegment {
@@ -89,12 +86,13 @@ pub struct CompactTransform {
8986
scan_progress: Arc<Progress>,
9087
output_data: Option<DataBlock>,
9188

92-
ctx: Arc<dyn TableContext>,
9389
block_reader: Arc<BlockReader>,
9490
location_gen: TableMetaLocationGenerator,
9591
dal: Operator,
9692

9793
// Limit the memory size of the block read.
94+
max_memory: u64,
95+
max_io_requests: usize,
9896
compact_tasks: VecDeque<CompactTask>,
9997
block_metas: Vec<Arc<BlockMeta>>,
10098
order: usize,
@@ -114,104 +112,29 @@ impl CompactTransform {
114112
dal: Operator,
115113
thresholds: BlockCompactThresholds,
116114
) -> Result<ProcessorPtr> {
115+
let settings = ctx.get_settings();
116+
let max_memory_usage = (settings.get_max_memory_usage()? as f64 * 0.95) as u64;
117+
let max_threads = settings.get_max_threads()?;
118+
let max_memory = max_memory_usage / max_threads;
119+
let max_io_requests = settings.get_max_storage_io_requests()? as usize;
117120
Ok(ProcessorPtr::create(Box::new(CompactTransform {
118121
state: State::Consume,
119122
input,
120123
output,
121124
scan_progress,
122125
output_data: None,
123-
ctx,
124126
block_reader,
125127
location_gen,
126128
dal,
129+
max_memory,
130+
max_io_requests,
127131
compact_tasks: VecDeque::new(),
128132
block_metas: Vec::new(),
129133
order: 0,
130134
thresholds,
131135
abort_operation: AbortOperation::default(),
132136
})))
133137
}
134-
135-
async fn read_blocks(&mut self) -> Result<CompactState> {
136-
// block read tasks.
137-
let mut task_futures = Vec::new();
138-
// The no need compact blockmetas.
139-
let mut trivals = VecDeque::new();
140-
let mut memory_usage = 0;
141-
let mut stats_of_columns = Vec::new();
142-
143-
let ctx = self.ctx.clone();
144-
let settings = ctx.get_settings();
145-
let max_memory_usage = (settings.get_max_memory_usage()? as f64 * 0.95) as u64;
146-
let max_threads = settings.get_max_threads()?;
147-
let max_memory = max_memory_usage / max_threads;
148-
while let Some(task) = self.compact_tasks.pop_front() {
149-
let metas = task.get_block_metas();
150-
// Only one block, no need to do a compact.
151-
if metas.len() == 1 {
152-
stats_of_columns.push(vec![]);
153-
trivals.push_back(metas[0].clone());
154-
continue;
155-
}
156-
157-
memory_usage += metas.iter().fold(0, |acc, meta| {
158-
let memory = meta.bloom_filter_index_size + meta.block_size;
159-
acc + memory
160-
});
161-
162-
if memory_usage > max_memory && !task_futures.is_empty() {
163-
self.compact_tasks.push_front(task);
164-
break;
165-
}
166-
167-
let mut meta_stats = Vec::with_capacity(metas.len());
168-
for meta in metas {
169-
let progress_values = ProgressValues {
170-
rows: meta.row_count as usize,
171-
bytes: meta.block_size as usize,
172-
};
173-
self.scan_progress.incr(&progress_values);
174-
175-
meta_stats.push(meta.col_stats.clone());
176-
177-
let block_reader = self.block_reader.clone();
178-
// read block in parallel.
179-
task_futures
180-
.push(async move { block_reader.read_with_block_meta(meta.as_ref()).await });
181-
}
182-
stats_of_columns.push(meta_stats);
183-
}
184-
185-
let blocks = try_join_futures(ctx, task_futures, "deletion-read-blocks-worker".to_owned())
186-
.await?
187-
.into_iter()
188-
.collect::<Result<Vec<_>>>()?;
189-
Ok(CompactState {
190-
blocks,
191-
stats_of_columns,
192-
trivals,
193-
})
194-
}
195-
196-
async fn write_blocks(&mut self, serialize_states: &mut Vec<SerializeState>) -> Result<()> {
197-
let mut handles = Vec::with_capacity(serialize_states.len());
198-
let ctx = self.ctx.clone();
199-
200-
while let Some(state) = serialize_states.pop() {
201-
let dal = self.dal.clone();
202-
handles.push(async move {
203-
// write block data.
204-
write_data(&state.block_data, &dal, &state.block_location).await?;
205-
// write index data.
206-
write_data(&state.index_data, &dal, &state.index_location).await
207-
});
208-
}
209-
try_join_futures(ctx, handles, "deletion-write-blocks-worker".to_owned())
210-
.await?
211-
.into_iter()
212-
.collect::<Result<Vec<_>>>()?;
213-
Ok(())
214-
}
215138
}
216139

217140
#[async_trait::async_trait]
@@ -227,7 +150,7 @@ impl Processor for CompactTransform {
227150
fn event(&mut self) -> Result<Event> {
228151
if matches!(
229152
&self.state,
230-
State::CompactBlocks(_) | State::GenerateSegment { .. } | State::Output { .. }
153+
State::CompactBlocks { .. } | State::GenerateSegment { .. } | State::Output { .. }
231154
) {
232155
return Ok(Event::Sync);
233156
}
@@ -278,18 +201,21 @@ impl Processor for CompactTransform {
278201

279202
fn process(&mut self) -> Result<()> {
280203
match std::mem::replace(&mut self.state, State::Consume) {
281-
State::CompactBlocks(mut compact_state) => {
204+
State::CompactBlocks {
205+
mut blocks,
206+
stats_of_columns,
207+
mut trivals,
208+
} => {
282209
let mut serialize_states = Vec::new();
283-
for stats in compact_state.stats_of_columns {
210+
for stats in stats_of_columns {
284211
let block_num = stats.len();
285212
if block_num == 0 {
286-
self.block_metas
287-
.push(compact_state.trivals.pop_front().unwrap());
213+
self.block_metas.push(trivals.pop_front().unwrap());
288214
continue;
289215
}
290216

291217
// concat blocks.
292-
let compact_blocks: Vec<_> = compact_state.blocks.drain(0..block_num).collect();
218+
let compact_blocks: Vec<_> = blocks.drain(0..block_num).collect();
293219
let new_block = DataBlock::concat_blocks(&compact_blocks)?;
294220

295221
// generate block statistics.
@@ -381,11 +307,73 @@ impl Processor for CompactTransform {
381307
async fn async_process(&mut self) -> Result<()> {
382308
match std::mem::replace(&mut self.state, State::Consume) {
383309
State::ReadBlocks => {
384-
let compact_state = self.read_blocks().await?;
385-
self.state = State::CompactBlocks(compact_state);
310+
// block read tasks.
311+
let mut task_futures = Vec::new();
312+
// The no need compact blockmetas.
313+
let mut trivals = VecDeque::new();
314+
let mut memory_usage = 0;
315+
let mut stats_of_columns = Vec::new();
316+
317+
let block_reader = self.block_reader.as_ref();
318+
while let Some(task) = self.compact_tasks.pop_front() {
319+
let metas = task.get_block_metas();
320+
// Only one block, no need to do a compact.
321+
if metas.len() == 1 {
322+
stats_of_columns.push(vec![]);
323+
trivals.push_back(metas[0].clone());
324+
continue;
325+
}
326+
327+
memory_usage += metas.iter().fold(0, |acc, meta| {
328+
let memory = meta.bloom_filter_index_size + meta.block_size;
329+
acc + memory
330+
});
331+
332+
if (memory_usage > self.max_memory
333+
|| task_futures.len() + metas.len() > self.max_io_requests)
334+
&& !task_futures.is_empty()
335+
{
336+
self.compact_tasks.push_front(task);
337+
break;
338+
}
339+
340+
let mut meta_stats = Vec::with_capacity(metas.len());
341+
for meta in metas {
342+
let progress_values = ProgressValues {
343+
rows: meta.row_count as usize,
344+
bytes: meta.block_size as usize,
345+
};
346+
self.scan_progress.incr(&progress_values);
347+
348+
meta_stats.push(meta.col_stats.clone());
349+
350+
// read block in parallel.
351+
task_futures.push(async move {
352+
block_reader.read_with_block_meta(meta.as_ref()).await
353+
});
354+
}
355+
stats_of_columns.push(meta_stats);
356+
}
357+
358+
let blocks = futures::future::try_join_all(task_futures).await?;
359+
self.state = State::CompactBlocks {
360+
blocks,
361+
stats_of_columns,
362+
trivals,
363+
}
386364
}
387365
State::SerializedBlocks(mut serialize_states) => {
388-
self.write_blocks(&mut serialize_states).await?;
366+
let mut handles = Vec::with_capacity(serialize_states.len());
367+
let dal = &self.dal;
368+
while let Some(state) = serialize_states.pop() {
369+
handles.push(async move {
370+
// write block data.
371+
write_data(&state.block_data, dal, &state.block_location).await?;
372+
// write index data.
373+
write_data(&state.index_data, dal, &state.index_location).await
374+
});
375+
}
376+
futures::future::try_join_all(handles).await?;
389377
if self.compact_tasks.is_empty() {
390378
self.state = State::GenerateSegment;
391379
} else {

0 commit comments

Comments
 (0)