@@ -18,13 +18,19 @@ use chrono::Duration;
18
18
use databend_common_catalog:: table:: TableExt ;
19
19
use databend_common_exception:: Result ;
20
20
use databend_common_expression:: types:: StringType ;
21
+ use databend_common_expression:: types:: UInt64Type ;
21
22
use databend_common_expression:: DataBlock ;
22
23
use databend_common_expression:: FromData ;
23
24
use databend_common_license:: license:: Feature :: Vacuum ;
24
25
use databend_common_license:: license_manager:: get_license_manager;
25
26
use databend_common_sql:: plans:: VacuumTablePlan ;
26
27
use databend_common_storages_fuse:: FuseTable ;
28
+ use databend_common_storages_fuse:: FUSE_TBL_BLOCK_PREFIX ;
29
+ use databend_common_storages_fuse:: FUSE_TBL_SEGMENT_PREFIX ;
30
+ use databend_common_storages_fuse:: FUSE_TBL_SNAPSHOT_PREFIX ;
31
+ use databend_common_storages_fuse:: FUSE_TBL_XOR_BLOOM_INDEX_PREFIX ;
27
32
use databend_enterprise_vacuum_handler:: get_vacuum_handler;
33
+ use opendal:: Metakey ;
28
34
29
35
use crate :: interpreters:: Interpreter ;
30
36
use crate :: pipelines:: PipelineBuildResult ;
@@ -36,10 +42,57 @@ pub struct VacuumTableInterpreter {
36
42
plan : VacuumTablePlan ,
37
43
}
38
44
45
+ type FileStat = ( u64 , u64 ) ;
46
+
47
+ #[ derive( Debug , Default ) ]
48
+ struct Statistics {
49
+ pub snapshot_files : FileStat ,
50
+ pub segment_files : FileStat ,
51
+ pub block_files : FileStat ,
52
+ pub index_files : FileStat ,
53
+ }
54
+
39
55
impl VacuumTableInterpreter {
40
56
pub fn try_create ( ctx : Arc < QueryContext > , plan : VacuumTablePlan ) -> Result < Self > {
41
57
Ok ( VacuumTableInterpreter { ctx, plan } )
42
58
}
59
+
60
+ async fn get_statistics ( & self , fuse_table : & FuseTable ) -> Result < Statistics > {
61
+ let operator = fuse_table. get_operator ( ) ;
62
+ let table_data_prefix = format ! ( "/{}" , fuse_table. meta_location_generator( ) . prefix( ) ) ;
63
+
64
+ let mut snapshot_files = ( 0 , 0 ) ;
65
+ let mut segment_files = ( 0 , 0 ) ;
66
+ let mut block_files = ( 0 , 0 ) ;
67
+ let mut index_files = ( 0 , 0 ) ;
68
+
69
+ let prefix_with_stats = vec ! [
70
+ ( FUSE_TBL_SNAPSHOT_PREFIX , & mut snapshot_files) ,
71
+ ( FUSE_TBL_SEGMENT_PREFIX , & mut segment_files) ,
72
+ ( FUSE_TBL_BLOCK_PREFIX , & mut block_files) ,
73
+ ( FUSE_TBL_XOR_BLOOM_INDEX_PREFIX , & mut index_files) ,
74
+ ] ;
75
+
76
+ for ( dir_prefix, stat) in prefix_with_stats {
77
+ for entry in operator
78
+ . list_with ( & format ! ( "{}/{}/" , table_data_prefix, dir_prefix) )
79
+ . metakey ( Metakey :: ContentLength )
80
+ . await ?
81
+ {
82
+ if entry. metadata ( ) . is_file ( ) {
83
+ stat. 0 += 1 ;
84
+ stat. 1 += entry. metadata ( ) . content_length ( ) ;
85
+ }
86
+ }
87
+ }
88
+
89
+ Ok ( Statistics {
90
+ snapshot_files,
91
+ segment_files,
92
+ block_files,
93
+ index_files,
94
+ } )
95
+ }
43
96
}
44
97
45
98
#[ async_trait:: async_trait]
@@ -83,7 +136,31 @@ impl Interpreter for VacuumTableInterpreter {
83
136
. await ?;
84
137
85
138
match purge_files_opt {
86
- None => return Ok ( PipelineBuildResult :: create ( ) ) ,
139
+ None => {
140
+ return {
141
+ let stat = self . get_statistics ( fuse_table) . await ?;
142
+ let total_files = stat. snapshot_files . 0
143
+ + stat. segment_files . 0
144
+ + stat. block_files . 0
145
+ + stat. index_files . 0 ;
146
+ let total_size = stat. snapshot_files . 1
147
+ + stat. segment_files . 1
148
+ + stat. block_files . 1
149
+ + stat. index_files . 1 ;
150
+ PipelineBuildResult :: from_blocks ( vec ! [ DataBlock :: new_from_columns( vec![
151
+ UInt64Type :: from_data( vec![ stat. snapshot_files. 0 ] ) ,
152
+ UInt64Type :: from_data( vec![ stat. snapshot_files. 1 ] ) ,
153
+ UInt64Type :: from_data( vec![ stat. segment_files. 0 ] ) ,
154
+ UInt64Type :: from_data( vec![ stat. segment_files. 1 ] ) ,
155
+ UInt64Type :: from_data( vec![ stat. block_files. 0 ] ) ,
156
+ UInt64Type :: from_data( vec![ stat. block_files. 1 ] ) ,
157
+ UInt64Type :: from_data( vec![ stat. index_files. 0 ] ) ,
158
+ UInt64Type :: from_data( vec![ stat. index_files. 1 ] ) ,
159
+ UInt64Type :: from_data( vec![ total_files] ) ,
160
+ UInt64Type :: from_data( vec![ total_size] ) ,
161
+ ] ) ] )
162
+ } ;
163
+ }
87
164
Some ( purge_files) => PipelineBuildResult :: from_blocks ( vec ! [
88
165
DataBlock :: new_from_columns( vec![ StringType :: from_data( purge_files) ] ) ,
89
166
] ) ,
0 commit comments