Skip to content

Commit c9d4ca5

Browse files
committed
refactor the by comment
1 parent aa99f36 commit c9d4ca5

File tree

2 files changed

+23
-12
lines changed

2 files changed

+23
-12
lines changed

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,26 @@ pub async fn validate_grant_object_exists(
160160
Ok(())
161161
}
162162

163+
pub async fn stat_file(
164+
ctx: &Arc<QueryContext>,
165+
stage: &UserStageInfo,
166+
path: &str,
167+
) -> Result<StageFile> {
168+
let table_ctx: Arc<dyn TableContext> = ctx.clone();
169+
let op = StageSourceHelper::get_op(&table_ctx, stage).await?;
170+
let meta = op.object(path).metadata().await?;
171+
Ok(StageFile {
172+
path: path.to_owned(),
173+
size: meta.content_length(),
174+
md5: meta.content_md5().map(str::to_string),
175+
last_modified: meta
176+
.last_modified()
177+
.map_or(Utc::now(), |t| Utc.timestamp(t.unix_timestamp(), 0)),
178+
creator: None,
179+
etag: meta.etag().map(str::to_string),
180+
})
181+
}
182+
163183
pub async fn list_files(
164184
ctx: &Arc<QueryContext>,
165185
stage: &UserStageInfo,

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use futures::TryStreamExt;
3232
use regex::Regex;
3333

3434
use super::append2table;
35-
use crate::interpreters::interpreter_common::list_files;
35+
use crate::interpreters::interpreter_common::stat_file;
3636
use crate::interpreters::Interpreter;
3737
use crate::interpreters::SelectInterpreterV2;
3838
use crate::pipelines::PipelineBuildResult;
@@ -79,12 +79,7 @@ impl CopyInterpreterV2 {
7979
// if force is false, copy only the files that unmatch to the meta copied files info.
8080
let resp = catalog.get_table_copied_file_info(req).await?;
8181
for file in files.iter() {
82-
let stage_file = list_files(&self.ctx, &table_info.stage_info, file, "").await?;
83-
if stage_file.is_empty() {
84-
continue;
85-
}
86-
87-
let stage_file = &stage_file[0];
82+
let stage_file = stat_file(&self.ctx, &table_info.stage_info, file).await?;
8883

8984
if let Some(file_info) = resp.file_info.get(file) {
9085
match &file_info.etag {
@@ -120,12 +115,8 @@ impl CopyInterpreterV2 {
120115
} else {
121116
// if force is true, copy all the file.
122117
for file in files.iter() {
123-
let stage_file = list_files(&self.ctx, &table_info.stage_info, file, "").await?;
124-
if stage_file.is_empty() {
125-
continue;
126-
}
118+
let stage_file = stat_file(&self.ctx, &table_info.stage_info, file).await?;
127119

128-
let stage_file = &stage_file[0];
129120
file_map.insert(file.clone(), TableCopiedFileInfo {
130121
etag: stage_file.etag.clone(),
131122
content_length: stage_file.size,

0 commit comments

Comments
 (0)