Skip to content

Commit ff0af2f

Browse files
authored
refactor: list stage use stream to avoid OOM. (#15405)
refactor: list stage use stream to avoid oom.
1 parent d7c9a83 commit ff0af2f

File tree

3 files changed

+84
-49
lines changed

3 files changed

+84
-49
lines changed

src/common/storage/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub use parquet_rs::read_parquet_schema_async_rs;
6161
mod stage;
6262
pub use stage::init_stage_operator;
6363
pub use stage::StageFileInfo;
64+
pub use stage::StageFileInfoStream;
6465
pub use stage::StageFileStatus;
6566
pub use stage::StageFilesInfo;
6667
pub use stage::STDIN_FD;

src/common/storage/src/stage.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ pub struct StageFilesInfo {
9494
pub pattern: Option<String>,
9595
}
9696

97+
pub type StageFileInfoStream = Pin<Box<dyn Stream<Item = Result<StageFileInfo>> + Send>>;
98+
9799
impl StageFilesInfo {
98100
fn get_pattern(&self) -> Result<Option<Regex>> {
99101
match &self.pattern {
@@ -169,7 +171,7 @@ impl StageFilesInfo {
169171
operator: &Operator,
170172
thread_num: usize,
171173
max_files: Option<usize>,
172-
) -> Result<Pin<Box<dyn Stream<Item = Result<StageFileInfo>> + Send>>> {
174+
) -> Result<StageFileInfoStream> {
173175
if self.path == STDIN_FD {
174176
return Ok(Box::pin(stream::iter(vec![Ok(stdin_stage_info())])));
175177
}
@@ -255,7 +257,7 @@ impl StageFilesInfo {
255257
path: &str,
256258
pattern: Option<Regex>,
257259
max_files: Option<usize>,
258-
) -> Result<Pin<Box<dyn Stream<Item = Result<StageFileInfo>> + Send>>> {
260+
) -> Result<StageFileInfoStream> {
259261
if path == STDIN_FD {
260262
return Ok(Box::pin(stream::once(async { Ok(stdin_stage_info()) })));
261263
}

src/query/service/src/table_functions/list_stage/list_stage_table.rs

Lines changed: 79 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,12 @@ use databend_common_pipeline_core::Pipeline;
4343
use databend_common_pipeline_sources::AsyncSource;
4444
use databend_common_pipeline_sources::AsyncSourcer;
4545
use databend_common_sql::binder::resolve_stage_location;
46+
use databend_common_storage::StageFileInfo;
47+
use databend_common_storage::StageFileInfoStream;
4648
use databend_common_storage::StageFilesInfo;
4749
use databend_common_storages_stage::StageTable;
50+
use futures_util::stream::Chunks;
51+
use futures_util::StreamExt;
4852

4953
use crate::table_functions::list_stage::table_args::ListStageArgsParsed;
5054

@@ -150,8 +154,13 @@ impl TableFunction for ListStageTable {
150154
}
151155
}
152156

157+
enum State {
158+
NotStarted,
159+
Listing(Chunks<StageFileInfoStream>),
160+
Finished,
161+
}
153162
struct ListStagesSource {
154-
is_finished: bool,
163+
state: State,
155164
ctx: Arc<dyn TableContext>,
156165
args_parsed: ListStageArgsParsed,
157166
}
@@ -163,26 +172,13 @@ impl ListStagesSource {
163172
args_parsed: ListStageArgsParsed,
164173
) -> Result<ProcessorPtr> {
165174
AsyncSourcer::create(ctx.clone(), output, ListStagesSource {
166-
is_finished: false,
175+
state: State::NotStarted,
167176
ctx,
168177
args_parsed,
169178
})
170179
}
171-
}
172-
173-
#[async_trait::async_trait]
174-
impl AsyncSource for ListStagesSource {
175-
const NAME: &'static str = LIST_STAGE;
176-
177-
#[async_trait::unboxed_simple]
178-
#[async_backtrace::framed]
179-
async fn generate(&mut self) -> Result<Option<DataBlock>> {
180-
if self.is_finished {
181-
return Ok(None);
182-
}
183-
184-
self.is_finished = true;
185180

181+
async fn do_list(&mut self) -> Result<StageFileInfoStream> {
186182
let (stage_info, path) =
187183
resolve_stage_location(self.ctx.as_ref(), &self.args_parsed.location).await?;
188184
let enable_experimental_rbac_check = self
@@ -209,37 +205,73 @@ impl AsyncSource for ListStagesSource {
209205
files: self.args_parsed.files_info.files.clone(),
210206
pattern: self.args_parsed.files_info.pattern.clone(),
211207
};
208+
let files = files_info.list_stream(&op, thread_num, None).await?;
209+
Ok(files)
210+
}
211+
}
212212

213-
let files = files_info.list(&op, thread_num, None).await?;
214-
215-
let names: Vec<String> = files.iter().map(|file| file.path.to_string()).collect();
216-
217-
let sizes: Vec<u64> = files.iter().map(|file| file.size).collect();
218-
let etags: Vec<Option<String>> = files
219-
.iter()
220-
.map(|file| file.etag.as_ref().map(|f| f.to_string()))
221-
.collect();
222-
let last_modifieds: Vec<String> = files
223-
.iter()
224-
.map(|file| {
225-
file.last_modified
226-
.format("%Y-%m-%d %H:%M:%S.%3f %z")
227-
.to_string()
228-
})
229-
.collect();
230-
let creators: Vec<Option<String>> = files
231-
.iter()
232-
.map(|file| file.creator.as_ref().map(|c| c.display().to_string()))
233-
.collect();
234-
235-
let block = DataBlock::new_from_columns(vec![
236-
StringType::from_data(names),
237-
UInt64Type::from_data(sizes),
238-
StringType::from_opt_data(etags),
239-
StringType::from_data(last_modifieds),
240-
StringType::from_opt_data(creators),
241-
]);
242-
243-
Ok(Some(block))
213+
fn make_block(files: &[StageFileInfo]) -> DataBlock {
214+
let names: Vec<String> = files.iter().map(|file| file.path.to_string()).collect();
215+
216+
let sizes: Vec<u64> = files.iter().map(|file| file.size).collect();
217+
let etags: Vec<Option<String>> = files
218+
.iter()
219+
.map(|file| file.etag.as_ref().map(|f| f.to_string()))
220+
.collect();
221+
let last_modifieds: Vec<String> = files
222+
.iter()
223+
.map(|file| {
224+
file.last_modified
225+
.format("%Y-%m-%d %H:%M:%S.%3f %z")
226+
.to_string()
227+
})
228+
.collect();
229+
let creators: Vec<Option<String>> = files
230+
.iter()
231+
.map(|file| file.creator.as_ref().map(|c| c.display().to_string()))
232+
.collect();
233+
234+
DataBlock::new_from_columns(vec![
235+
StringType::from_data(names),
236+
UInt64Type::from_data(sizes),
237+
StringType::from_opt_data(etags),
238+
StringType::from_data(last_modifieds),
239+
StringType::from_opt_data(creators),
240+
])
241+
}
242+
243+
#[async_trait::async_trait]
244+
impl AsyncSource for ListStagesSource {
245+
const NAME: &'static str = LIST_STAGE;
246+
247+
#[async_trait::unboxed_simple]
248+
#[async_backtrace::framed]
249+
async fn generate(&mut self) -> Result<Option<DataBlock>> {
250+
match &self.state {
251+
State::Finished => {
252+
return Ok(None);
253+
}
254+
State::NotStarted => {
255+
let files = self.do_list().await?;
256+
// most of the time result of list_stage will not be written to another table.
257+
// 10000 is the default "page size" of http handler.
258+
self.state = State::Listing(files.chunks(10000));
259+
}
260+
State::Listing(_) => {}
261+
};
262+
if let State::Listing(chunks) = &mut self.state {
263+
match chunks.next().await {
264+
Some(chunk) => {
265+
let chunk: Result<Vec<StageFileInfo>> = chunk.into_iter().collect();
266+
Ok(Some(make_block(&chunk?)))
267+
}
268+
None => {
269+
self.state = State::Finished;
270+
Ok(None)
271+
}
272+
}
273+
} else {
274+
unreachable!("state should be State::Listing")
275+
}
244276
}
245277
}

0 commit comments

Comments
 (0)