Skip to content

Commit b275136

Browse files
committed
add truncate_table in catalog to truncate all the copied files info in meta service
1 parent e92af52 commit b275136

File tree

7 files changed

+47
-1
lines changed

7 files changed

+47
-1
lines changed

โ€Žsrc/query/catalog/src/catalog.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use common_meta_app::schema::RenameTableReq;
3535
use common_meta_app::schema::TableIdent;
3636
use common_meta_app::schema::TableInfo;
3737
use common_meta_app::schema::TableMeta;
38+
use common_meta_app::schema::TruncateTableReply;
39+
use common_meta_app::schema::TruncateTableReq;
3840
use common_meta_app::schema::UndropDatabaseReply;
3941
use common_meta_app::schema::UndropDatabaseReq;
4042
use common_meta_app::schema::UndropTableReply;
@@ -166,6 +168,8 @@ pub trait Catalog: DynClone + Send + Sync {
166168
req: UpsertTableCopiedFileReq,
167169
) -> Result<UpsertTableCopiedFileReply>;
168170

171+
async fn truncate_table(&self, req: TruncateTableReq) -> Result<TruncateTableReply>;
172+
169173
/// Table function
170174
171175
// Get function by name.

โ€Žsrc/query/service/src/catalogs/default/database_catalog.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ use common_meta_app::schema::RenameTableReq;
3434
use common_meta_app::schema::TableIdent;
3535
use common_meta_app::schema::TableInfo;
3636
use common_meta_app::schema::TableMeta;
37+
use common_meta_app::schema::TruncateTableReply;
38+
use common_meta_app::schema::TruncateTableReq;
3739
use common_meta_app::schema::UndropDatabaseReply;
3840
use common_meta_app::schema::UndropDatabaseReq;
3941
use common_meta_app::schema::UndropTableReply;
@@ -457,6 +459,10 @@ impl Catalog for DatabaseCatalog {
457459
.await
458460
}
459461

462+
async fn truncate_table(&self, req: TruncateTableReq) -> Result<TruncateTableReply> {
463+
self.mutable_catalog.truncate_table(req).await
464+
}
465+
460466
async fn upsert_table_option(
461467
&self,
462468
req: UpsertTableOptionReq,

โ€Žsrc/query/service/src/catalogs/default/immutable_catalog.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ use common_meta_app::schema::RenameTableReq;
3434
use common_meta_app::schema::TableIdent;
3535
use common_meta_app::schema::TableInfo;
3636
use common_meta_app::schema::TableMeta;
37+
use common_meta_app::schema::TruncateTableReply;
38+
use common_meta_app::schema::TruncateTableReq;
3739
use common_meta_app::schema::UndropDatabaseReply;
3840
use common_meta_app::schema::UndropDatabaseReq;
3941
use common_meta_app::schema::UndropTableReply;
@@ -216,6 +218,13 @@ impl Catalog for ImmutableCatalog {
216218
)))
217219
}
218220

221+
async fn truncate_table(&self, req: TruncateTableReq) -> Result<TruncateTableReply> {
222+
Err(ErrorCode::UnImplement(format!(
223+
"truncate_table not allowed for system database {:?}",
224+
req
225+
)))
226+
}
227+
219228
async fn upsert_table_option(
220229
&self,
221230
req: UpsertTableOptionReq,

โ€Žsrc/query/service/src/catalogs/default/mutable_catalog.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use common_meta_app::schema::RenameTableReq;
4242
use common_meta_app::schema::TableIdent;
4343
use common_meta_app::schema::TableInfo;
4444
use common_meta_app::schema::TableMeta;
45+
use common_meta_app::schema::TruncateTableReply;
46+
use common_meta_app::schema::TruncateTableReq;
4547
use common_meta_app::schema::UndropDatabaseReply;
4648
use common_meta_app::schema::UndropDatabaseReq;
4749
use common_meta_app::schema::UndropTableReply;
@@ -332,6 +334,11 @@ impl Catalog for MutableCatalog {
332334
Ok(res)
333335
}
334336

337+
async fn truncate_table(&self, req: TruncateTableReq) -> Result<TruncateTableReply> {
338+
let res = self.ctx.meta.truncate_table(req).await?;
339+
Ok(res)
340+
}
341+
335342
async fn count_tables(&self, req: CountTablesReq) -> Result<CountTablesReply> {
336343
let res = self.ctx.meta.count_tables(req).await?;
337344
Ok(res)

โ€Žsrc/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ impl CopyInterpreterV2 {
7676
let mut file_map = BTreeMap::new();
7777

7878
if !force {
79+
// if force is false, copy only the files that unmatch to the meta copied files info.
7980
let resp = catalog.get_table_copied_file_info(req).await?;
8081
for file in files.iter() {
8182
let stage_file = list_files(&self.ctx, &table_info.stage_info, file, "").await?;
@@ -88,12 +89,17 @@ impl CopyInterpreterV2 {
8889
if let Some(file_info) = resp.file_info.get(file) {
8990
// No need to copy the file again if etag is_some and match.
9091
if stage_file.etag.is_some() && stage_file.etag == file_info.etag {
92+
tracing::warn!("ignore copy file {:?} matched by etag", file);
9193
continue;
9294
}
9395

9496
if file_info.content_length == stage_file.size
9597
&& file_info.last_modified == Some(stage_file.last_modified)
9698
{
99+
tracing::warn!(
100+
"ignore copy file {:?} matched by content_length and last_modified",
101+
file
102+
);
97103
continue;
98104
}
99105
}
@@ -106,6 +112,7 @@ impl CopyInterpreterV2 {
106112
});
107113
}
108114
} else {
115+
// if force is true, copy all the file.
109116
for file in files.iter() {
110117
let stage_file = list_files(&self.ctx, &table_info.stage_info, file, "").await?;
111118
if stage_file.is_empty() {

โ€Žsrc/query/storages/fuse/src/operations/truncate.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use common_exception::Result;
1919
use common_fuse_meta::meta::TableSnapshot;
2020
use common_fuse_meta::meta::Versioned;
2121
use common_meta_app::schema::TableStatistics;
22+
use common_meta_app::schema::TruncateTableReq;
2223
use common_meta_app::schema::UpdateTableMetaReq;
2324
use common_meta_types::MatchSeq;
2425
use uuid::Uuid;
@@ -69,13 +70,19 @@ impl FuseTable {
6970

7071
let table_id = self.table_info.ident.table_id;
7172
let table_version = self.table_info.ident.seq;
72-
ctx.get_catalog(catalog_name)?
73+
let catalog = ctx.get_catalog(catalog_name)?;
74+
75+
catalog
7376
.update_table_meta(UpdateTableMetaReq {
7477
table_id,
7578
seq: MatchSeq::Exact(table_version),
7679
new_table_meta,
7780
})
7881
.await?;
82+
83+
catalog
84+
.truncate_table(TruncateTableReq { table_id })
85+
.await?;
7986
}
8087

8188
Ok(())

โ€Žsrc/query/storages/hive/src/hive_catalog.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ use common_meta_app::schema::RenameTableReq;
4444
use common_meta_app::schema::TableIdent;
4545
use common_meta_app::schema::TableInfo;
4646
use common_meta_app::schema::TableMeta;
47+
use common_meta_app::schema::TruncateTableReply;
48+
use common_meta_app::schema::TruncateTableReq;
4749
use common_meta_app::schema::UndropDatabaseReply;
4850
use common_meta_app::schema::UndropDatabaseReq;
4951
use common_meta_app::schema::UndropTableReply;
@@ -350,6 +352,10 @@ impl Catalog for HiveCatalog {
350352
unimplemented!()
351353
}
352354

355+
async fn truncate_table(&self, _req: TruncateTableReq) -> Result<TruncateTableReply> {
356+
unimplemented!()
357+
}
358+
353359
async fn count_tables(&self, _req: CountTablesReq) -> Result<CountTablesReply> {
354360
unimplemented!()
355361
}

0 commit comments

Comments
ย (0)