Skip to content

Commit aa7960a

Browse files
authored
refactor: remove stage use stream to avoid OOM. (#15378)
* refactor: remove stage use stream to avoid oom. * refactor: change REMOVE_BATCH to 1000. * ci: fix flaky test.
1 parent be942d3 commit aa7960a

File tree

4 files changed

+148
-93
lines changed

4 files changed

+148
-93
lines changed

src/common/storage/src/stage.rs

Lines changed: 122 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
// limitations under the License.
1414

1515
use std::path::Path;
16+
use std::pin::Pin;
17+
use std::sync::Arc;
1618

1719
use chrono::DateTime;
1820
use chrono::Utc;
@@ -22,6 +24,10 @@ use databend_common_exception::Result;
2224
use databend_common_meta_app::principal::StageInfo;
2325
use databend_common_meta_app::principal::StageType;
2426
use databend_common_meta_app::principal::UserIdentity;
27+
use databend_common_meta_app::principal::COPY_MAX_FILES_PER_COMMIT;
28+
use futures::stream;
29+
use futures::Stream;
30+
use futures::StreamExt;
2531
use futures::TryStreamExt;
2632
use opendal::EntryMode;
2733
use opendal::Metadata;
@@ -103,6 +109,34 @@ impl StageFilesInfo {
103109
}
104110
}
105111

112+
#[async_backtrace::framed]
113+
async fn list_files(
114+
&self,
115+
operator: &Operator,
116+
thread_num: usize,
117+
max_files: Option<usize>,
118+
mut files: &[String],
119+
) -> Result<Vec<StageFileInfo>> {
120+
if let Some(m) = max_files {
121+
files = &files[..m]
122+
}
123+
let file_infos = self.stat_concurrent(operator, thread_num, files).await?;
124+
let mut res = Vec::with_capacity(file_infos.len());
125+
126+
for file_info in file_infos {
127+
match file_info {
128+
Ok((path, meta)) if meta.is_dir() => {
129+
return Err(ErrorCode::BadArguments(format!("{path} is not a file")));
130+
}
131+
Ok((path, meta)) => res.push(StageFileInfo::new(path, &meta)),
132+
Err(e) => {
133+
return Err(e);
134+
}
135+
}
136+
}
137+
Ok(res)
138+
}
139+
106140
#[async_backtrace::framed]
107141
pub async fn list(
108142
&self,
@@ -111,31 +145,45 @@ impl StageFilesInfo {
111145
max_files: Option<usize>,
112146
) -> Result<Vec<StageFileInfo>> {
113147
if self.path == STDIN_FD {
114-
return Ok(vec![stdin_stage_info()?]);
148+
return Ok(vec![stdin_stage_info()]);
115149
}
116150

117-
let max_files = max_files.unwrap_or(usize::MAX);
118151
if let Some(files) = &self.files {
119-
let file_infos = self
120-
.stat_concurrent(operator, thread_num, max_files, files)
121-
.await?;
122-
let mut res = Vec::with_capacity(file_infos.len());
152+
self.list_files(operator, thread_num, max_files, files)
153+
.await
154+
} else {
155+
let pattern = self.get_pattern()?;
156+
StageFilesInfo::list_files_with_pattern(
157+
operator,
158+
&self.path,
159+
pattern,
160+
max_files.unwrap_or(COPY_MAX_FILES_PER_COMMIT),
161+
)
162+
.await
163+
}
164+
}
123165

124-
for file_info in file_infos {
125-
match file_info {
126-
Ok((path, meta)) if meta.is_dir() => {
127-
return Err(ErrorCode::BadArguments(format!("{path} is not a file")));
128-
}
129-
Ok((path, meta)) => res.push(StageFileInfo::new(path, &meta)),
130-
Err(e) => {
131-
return Err(e);
132-
}
133-
}
134-
}
135-
Ok(res)
166+
#[async_backtrace::framed]
167+
pub async fn list_stream(
168+
&self,
169+
operator: &Operator,
170+
thread_num: usize,
171+
max_files: Option<usize>,
172+
) -> Result<Pin<Box<dyn Stream<Item = Result<StageFileInfo>> + Send>>> {
173+
if self.path == STDIN_FD {
174+
return Ok(Box::pin(stream::iter(vec![Ok(stdin_stage_info())])));
175+
}
176+
177+
if let Some(files) = &self.files {
178+
let files = self
179+
.list_files(operator, thread_num, max_files, files)
180+
.await?;
181+
let files = files.into_iter().map(Ok);
182+
Ok(Box::pin(stream::iter(files)))
136183
} else {
137184
let pattern = self.get_pattern()?;
138-
StageFilesInfo::list_files_with_pattern(operator, &self.path, pattern, max_files).await
185+
StageFilesInfo::list_files_stream_with_pattern(operator, &self.path, pattern, max_files)
186+
.await
139187
}
140188
}
141189

@@ -195,40 +243,73 @@ impl StageFilesInfo {
195243
pattern: Option<Regex>,
196244
max_files: usize,
197245
) -> Result<Vec<StageFileInfo>> {
246+
Self::list_files_stream_with_pattern(operator, path, pattern, Some(max_files))
247+
.await?
248+
.try_collect::<Vec<_>>()
249+
.await
250+
}
251+
252+
#[async_backtrace::framed]
253+
pub async fn list_files_stream_with_pattern(
254+
operator: &Operator,
255+
path: &str,
256+
pattern: Option<Regex>,
257+
max_files: Option<usize>,
258+
) -> Result<Pin<Box<dyn Stream<Item = Result<StageFileInfo>> + Send>>> {
198259
if path == STDIN_FD {
199-
return Ok(vec![stdin_stage_info()?]);
260+
return Ok(Box::pin(stream::once(async { Ok(stdin_stage_info()) })));
200261
}
201-
let mut files = Vec::new();
202262
let prefix_len = if path == "/" { 0 } else { path.len() };
203263
let prefix_meta = operator.stat(path).await;
204-
match prefix_meta {
264+
let file_exact: Option<Result<StageFileInfo>> = match prefix_meta {
205265
Ok(meta) if meta.is_file() => {
206-
files.push(StageFileInfo::new(path.to_string(), &meta));
266+
let f = StageFileInfo::new(path.to_string(), &meta);
267+
if max_files == Some(1) {
268+
return Ok(Box::pin(stream::once(async { Ok(f) })));
269+
}
270+
Some(Ok(f))
207271
}
208272
Err(e) if e.kind() != opendal::ErrorKind::NotFound => {
209273
return Err(e.into());
210274
}
211-
_ => {}
275+
_ => None,
212276
};
213-
let mut lister = operator
277+
let file_exact_stream = stream::iter(file_exact.clone().into_iter());
278+
279+
let lister = operator
214280
.lister_with(path)
215281
.recursive(true)
216282
.metakey(StageFileInfo::meta_query())
217283
.await?;
218284

219-
if files.len() == max_files {
220-
return Ok(files);
221-
}
222-
while let Some(obj) = lister.try_next().await? {
223-
let meta = obj.metadata();
224-
if check_file(&obj.path()[prefix_len..], meta.mode(), &pattern) {
225-
files.push(StageFileInfo::new(obj.path().to_string(), meta));
226-
if files.len() == max_files {
227-
return Ok(files);
285+
let pattern = Arc::new(pattern);
286+
let files_with_prefix = lister.filter_map(move |result| {
287+
let pattern = pattern.clone();
288+
async move {
289+
match result {
290+
Ok(entry) => {
291+
let meta = entry.metadata();
292+
if check_file(&entry.path()[prefix_len..], meta.mode(), &pattern) {
293+
Some(Ok(StageFileInfo::new(entry.path().to_string(), meta)))
294+
} else {
295+
None
296+
}
297+
}
298+
Err(e) => Some(Err(ErrorCode::from(e))),
228299
}
229300
}
301+
});
302+
if let Some(max_files) = max_files {
303+
if file_exact.is_some() {
304+
Ok(Box::pin(
305+
file_exact_stream.chain(files_with_prefix.take(max_files - 1)),
306+
))
307+
} else {
308+
Ok(Box::pin(files_with_prefix.take(max_files)))
309+
}
310+
} else {
311+
Ok(Box::pin(file_exact_stream.chain(files_with_prefix)))
230312
}
231-
Ok(files)
232313
}
233314

234315
/// Stat files concurrently.
@@ -237,10 +318,9 @@ impl StageFilesInfo {
237318
&self,
238319
operator: &Operator,
239320
thread_num: usize,
240-
max_files: usize,
241321
files: &[String],
242322
) -> Result<Vec<Result<(String, Metadata)>>> {
243-
if max_files == 1 {
323+
if files.len() == 1 {
244324
let Some(file) = files.first() else {
245325
return Ok(vec![]);
246326
};
@@ -254,7 +334,7 @@ impl StageFilesInfo {
254334
}
255335

256336
// This clone is required to make sure we are not referring to `file: &String` in the closure
257-
let tasks = files.iter().take(max_files).cloned().map(|file| {
337+
let tasks = files.iter().cloned().map(|file| {
258338
let full_path = Path::new(&self.path)
259339
.join(file)
260340
.to_string_lossy()
@@ -292,7 +372,7 @@ fn blocking_list_files_with_pattern(
292372
max_files: usize,
293373
) -> Result<Vec<StageFileInfo>> {
294374
if path == STDIN_FD {
295-
return Ok(vec![stdin_stage_info()?]);
375+
return Ok(vec![stdin_stage_info()]);
296376
}
297377
let operator = operator.blocking();
298378
let mut files = Vec::new();
@@ -330,14 +410,14 @@ fn blocking_list_files_with_pattern(
330410

331411
pub const STDIN_FD: &str = "/dev/fd/0";
332412

333-
fn stdin_stage_info() -> Result<StageFileInfo> {
334-
Ok(StageFileInfo {
413+
fn stdin_stage_info() -> StageFileInfo {
414+
StageFileInfo {
335415
path: STDIN_FD.to_string(),
336416
size: u64::MAX,
337417
md5: None,
338418
last_modified: Utc::now(),
339419
etag: None,
340420
status: StageFileStatus::NeedCopy,
341421
creator: None,
342-
})
422+
}
343423
}

src/query/service/src/interpreters/interpreter_user_stage_remove.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ use std::sync::Arc;
1717
use databend_common_catalog::table_context::TableContext;
1818
use databend_common_exception::Result;
1919
use databend_common_sql::plans::RemoveStagePlan;
20+
use databend_common_storage::StageFileInfo;
2021
use databend_common_storage::StageFilesInfo;
2122
use databend_common_storages_fuse::io::Files;
2223
use databend_common_storages_stage::StageTable;
24+
use futures_util::StreamExt;
2325
use log::debug;
2426
use log::error;
2527

@@ -68,19 +70,19 @@ impl Interpreter for RemoveUserStageInterpreter {
6870
files: None,
6971
pattern,
7072
};
71-
let files: Vec<String> = files_info
72-
.list(&op, thread_num, None)
73-
.await?
74-
.into_iter()
75-
.map(|file_with_meta| file_with_meta.path)
76-
.collect::<Vec<_>>();
73+
let files = files_info.list_stream(&op, thread_num, None).await?;
7774

7875
let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
7976
let file_op = Files::create(table_ctx, op);
8077

81-
const REMOVE_BATCH: usize = 4000;
82-
for chunk in files.chunks(REMOVE_BATCH) {
83-
if let Err(e) = file_op.remove_file_in_batch(chunk).await {
78+
const REMOVE_BATCH: usize = 1000;
79+
let mut chunks = files.chunks(REMOVE_BATCH);
80+
81+
// s3 can remove at most 1k files in one request
82+
while let Some(chunk) = chunks.next().await {
83+
let chunk: Result<Vec<StageFileInfo>> = chunk.into_iter().collect();
84+
let chunk = chunk?.into_iter().map(|x| x.path).collect::<Vec<_>>();
85+
if let Err(e) = file_op.remove_file_in_batch(&chunk).await {
8486
error!("Failed to delete file: {:?}, error: {}", chunk, e);
8587
}
8688

tests/suites/1_stateful/00_stage/00_0004_copy_with_max_files.result

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,19 @@
11
--- force = false, purge = false
2-
f1.csv 2 0 NULL NULL
3-
f3.csv 2 0 NULL NULL
4-
4
5-
remain 3 files
6-
f2.csv 2 0 NULL NULL
7-
6
8-
remain 3 files
9-
6
10-
remain 3 files
2+
copied 2 files with 4 rows, remain 3 files
3+
copied 1 files with 6 rows, remain 3 files
4+
copied 0 files with 6 rows, remain 3 files
115
--- force = false, purge = true
12-
f1.csv 2 0 NULL NULL
13-
f3.csv 2 0 NULL NULL
14-
4
15-
remain 1 files
16-
f2.csv 2 0 NULL NULL
17-
6
18-
remain 0 files
19-
6
20-
remain 0 files
6+
copied 2 files with 4 rows, remain 1 files
7+
copied 1 files with 6 rows, remain 0 files
8+
copied 0 files with 6 rows, remain 0 files
219
--- force = true, purge = false
22-
f1.csv 2 0 NULL NULL
23-
f3.csv 2 0 NULL NULL
24-
4
25-
remain 3 files
26-
f1.csv 2 0 NULL NULL
27-
f3.csv 2 0 NULL NULL
28-
8
29-
remain 3 files
30-
f1.csv 2 0 NULL NULL
31-
f3.csv 2 0 NULL NULL
32-
12
33-
remain 3 files
10+
copied 2 files with 4 rows, remain 3 files
11+
copied 2 files with 8 rows, remain 3 files
12+
copied 2 files with 12 rows, remain 3 files
3413
--- force = true, purge = true
35-
f1.csv 2 0 NULL NULL
36-
f3.csv 2 0 NULL NULL
37-
4
38-
remain 1 files
39-
f2.csv 2 0 NULL NULL
40-
6
41-
remain 0 files
42-
6
43-
remain 0 files
14+
copied 2 files with 4 rows, remain 1 files
15+
copied 1 files with 6 rows, remain 0 files
16+
copied 0 files with 6 rows, remain 0 files
4417
>>>> drop table if exists test_max_files_limit
4518
>>>> create table test_max_files_limit (a int, b int)
4619
>>>> copy into test_max_files_limit from 'fs:///tmp/00_0004_2/' FILE_FORMAT = (type = CSV)

tests/suites/1_stateful/00_stage/00_0004_copy_with_max_files.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ do
4848
for i in {1..3}
4949
do
5050
table="test_max_files_force_${force}_purge_${purge}"
51-
echo "copy into ${table} from 'fs:///tmp/00_0004/' FILE_FORMAT = (type = CSV) max_files=2 force=${force} purge=${purge}" | $BENDSQL_CLIENT_CONNECT
52-
echo "select count(*) from ${table}" | $BENDSQL_CLIENT_CONNECT
51+
copied=$(echo "copy into ${table} from 'fs:///tmp/00_0004/' FILE_FORMAT = (type = CSV) max_files=2 force=${force} purge=${purge}" | $BENDSQL_CLIENT_CONNECT | wc -l | sed 's/ //g')
52+
copied_rows=$(echo "select count(*) from ${table}" | $BENDSQL_CLIENT_CONNECT)
5353
remain=$(ls -1 /tmp/00_0004/ | wc -l | sed 's/ //g')
54-
echo "remain ${remain} files"
54+
echo "copied ${copied} files with ${copied_rows} rows, remain ${remain} files"
5555
done
5656
done
5757
done

0 commit comments

Comments
 (0)