Skip to content

Commit e92af52

Browse files
committed
feat: idempotent-copy file
1 parent fb86e6b commit e92af52

File tree

1 file changed

+44
-34
lines changed

1 file changed

+44
-34
lines changed

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -127,18 +127,18 @@ impl CopyInterpreterV2 {
127127
&self,
128128
catalog_name: &str,
129129
table_id: u64,
130-
copy_stage_files: Option<BTreeMap<String, TableCopiedFileInfo>>,
130+
copy_stage_files: BTreeMap<String, TableCopiedFileInfo>,
131131
) -> Result<()> {
132-
if let Some(copy_stage_files) = copy_stage_files {
133-
if !copy_stage_files.is_empty() {
134-
let req = UpsertTableCopiedFileReq {
135-
table_id,
136-
file_info: copy_stage_files.clone(),
137-
expire_at: None,
138-
};
139-
let catalog = self.ctx.get_catalog(catalog_name)?;
140-
catalog.upsert_table_copied_file_info(req).await?;
141-
}
132+
tracing::info!("upsert_copied_files_info: {:?}", copy_stage_files);
133+
134+
if !copy_stage_files.is_empty() {
135+
let req = UpsertTableCopiedFileReq {
136+
table_id,
137+
file_info: copy_stage_files.clone(),
138+
expire_at: None,
139+
};
140+
let catalog = self.ctx.get_catalog(catalog_name)?;
141+
catalog.upsert_table_copied_file_info(req).await?;
142142
}
143143
Ok(())
144144
}
@@ -416,7 +416,7 @@ impl Interpreter for CopyInterpreterV2 {
416416

417417
tracing::info!("matched files: {:?}, pattern: {}", &files, pattern);
418418

419-
let (table_id, files, copy_stage_files) = match &from.source_info {
419+
match &from.source_info {
420420
SourceInfo::StageSource(table_info) => {
421421
let (table_id, copy_stage_files) = self
422422
.filter_duplicate_files(
@@ -434,30 +434,40 @@ impl Interpreter for CopyInterpreterV2 {
434434
&copy_stage_files.keys(),
435435
);
436436

437-
(
438-
table_id,
439-
copy_stage_files.keys().cloned().collect(),
440-
Some(copy_stage_files),
441-
)
442-
}
443-
_other => (0, files.clone(), None),
444-
};
437+
if copy_stage_files.is_empty() {
438+
return Ok(PipelineBuildResult::create());
439+
}
440+
441+
let result = self
442+
.copy_files_to_table(
443+
catalog_name,
444+
database_name,
445+
table_name,
446+
from,
447+
copy_stage_files.keys().cloned().collect(),
448+
)
449+
.await;
450+
451+
if result.is_ok() {
452+
let _ = self
453+
.upsert_copied_files_info(catalog_name, table_id, copy_stage_files)
454+
.await?;
455+
}
445456

446-
let result = self
447-
.copy_files_to_table(
448-
catalog_name,
449-
database_name,
450-
table_name,
451-
from,
452-
files.clone(),
453-
)
454-
.await;
455-
456-
if result.is_ok() {
457-
let _ = self.upsert_copied_files_info(catalog_name, table_id, copy_stage_files);
457+
result
458+
}
459+
_other => {
460+
return self
461+
.copy_files_to_table(
462+
catalog_name,
463+
database_name,
464+
table_name,
465+
from,
466+
files.clone(),
467+
)
468+
.await;
469+
}
458470
}
459-
result
460-
// Ok(PipelineBuildResult::create())
461471
}
462472
CopyPlanV2::IntoStage {
463473
stage, from, path, ..

0 commit comments

Comments
 (0)