@@ -16,11 +16,14 @@ use std::collections::HashSet;
16
16
use std:: sync:: Arc ;
17
17
18
18
use databend_common_catalog:: plan:: DataSourcePlan ;
19
+ use databend_common_catalog:: table_args:: TableArgs ;
19
20
use databend_common_exception:: ErrorCode ;
20
21
use databend_common_exception:: Result ;
22
+ use databend_common_expression:: types:: NumberScalar ;
21
23
use databend_common_expression:: types:: StringType ;
22
24
use databend_common_expression:: DataBlock ;
23
25
use databend_common_expression:: FromData ;
26
+ use databend_common_expression:: Scalar ;
24
27
use databend_common_expression:: TableDataType ;
25
28
use databend_common_expression:: TableField ;
26
29
use databend_common_expression:: TableSchemaRef ;
@@ -33,8 +36,10 @@ use log::info;
33
36
34
37
use crate :: sessions:: TableContext ;
35
38
use crate :: table_functions:: SimpleTableFunc ;
36
- use crate :: table_functions:: TableArgs ;
37
- pub struct FuseVacuumTemporaryTable ;
39
+
40
+ pub struct FuseVacuumTemporaryTable {
41
+ limit : Option < u64 > ,
42
+ }
38
43
39
44
#[ async_trait:: async_trait]
40
45
impl SimpleTableFunc for FuseVacuumTemporaryTable {
@@ -43,7 +48,11 @@ impl SimpleTableFunc for FuseVacuumTemporaryTable {
43
48
}
44
49
45
50
fn table_args ( & self ) -> Option < TableArgs > {
46
- None
51
+ self . limit . map ( |limit| {
52
+ TableArgs :: new_positioned ( vec ! [ databend_common_catalog:: table_args:: u64_literal(
53
+ limit,
54
+ ) ] )
55
+ } )
47
56
}
48
57
49
58
fn schema ( & self ) -> TableSchemaRef {
@@ -62,17 +71,39 @@ impl SimpleTableFunc for FuseVacuumTemporaryTable {
62
71
. await ?;
63
72
let client_session_mgr = UserApiProvider :: instance ( ) . client_session_api ( & ctx. get_tenant ( ) ) ;
64
73
let mut user_session_ids = HashSet :: new ( ) ;
74
+ let mut inactive_user_session_ids = HashSet :: new ( ) ;
65
75
while let Some ( entry) = lister. try_next ( ) . await ? {
76
+ if entry. metadata ( ) . is_dir ( ) {
77
+ continue ;
78
+ }
66
79
let path = entry. path ( ) ;
67
80
let parts: Vec < _ > = path. split ( '/' ) . collect ( ) ;
68
81
if parts. len ( ) < 3 {
69
82
return Err ( ErrorCode :: Internal ( format ! (
70
83
"invalid path for temp table: {path}"
71
84
) ) ) ;
72
85
} ;
73
- user_session_ids. insert ( ( parts[ 1 ] . to_string ( ) , parts[ 2 ] . to_string ( ) ) ) ;
86
+ let user_name = parts[ 1 ] . to_string ( ) ;
87
+ let session_id = parts[ 2 ] . to_string ( ) ;
88
+ if user_session_ids. contains ( & ( user_name. clone ( ) , session_id. clone ( ) ) ) {
89
+ continue ;
90
+ }
91
+ user_session_ids. insert ( ( user_name. clone ( ) , session_id. clone ( ) ) ) ;
92
+ if client_session_mgr
93
+ . get_client_session ( & user_name, & session_id)
94
+ . await ?
95
+ . is_none ( )
96
+ {
97
+ inactive_user_session_ids. insert ( ( user_name, session_id) ) ;
98
+ if inactive_user_session_ids. len ( ) >= self . limit . unwrap_or ( u64:: MAX ) as usize {
99
+ break ;
100
+ }
101
+ }
74
102
}
75
- for ( user_name, session_id) in user_session_ids {
103
+
104
+ let session_num = inactive_user_session_ids. len ( ) ;
105
+
106
+ for ( user_name, session_id) in inactive_user_session_ids {
76
107
if client_session_mgr
77
108
. get_client_session ( & user_name, & session_id)
78
109
. await ?
@@ -86,15 +117,43 @@ impl SimpleTableFunc for FuseVacuumTemporaryTable {
86
117
op. remove_all ( & path) . await ?;
87
118
}
88
119
}
89
- let col: Vec < String > = vec ! [ "Ok" . to_owned( ) ] ;
120
+ let col: Vec < String > = vec ! [ format!(
121
+ "Ok: processed temporary tables from {} inactive sessions" ,
122
+ session_num
123
+ ) ] ;
90
124
91
125
Ok ( Some ( DataBlock :: new_from_columns ( vec ! [
92
126
StringType :: from_data( col) ,
93
127
] ) ) )
94
128
}
95
129
96
- fn create ( _func_name : & str , _table_args : TableArgs ) -> Result < Self >
130
+ fn create ( func_name : & str , table_args : TableArgs ) -> Result < Self >
97
131
where Self : Sized {
98
- Ok ( Self )
132
+ let limit = match table_args. positioned . len ( ) {
133
+ 0 => None ,
134
+ 1 => {
135
+ let args = table_args. expect_all_positioned ( func_name, Some ( 1 ) ) ?;
136
+ let limit_val = match & args[ 0 ] {
137
+ Scalar :: Number ( NumberScalar :: UInt64 ( val) ) => * val,
138
+ Scalar :: Number ( NumberScalar :: UInt32 ( val) ) => * val as u64 ,
139
+ Scalar :: Number ( NumberScalar :: UInt16 ( val) ) => * val as u64 ,
140
+ Scalar :: Number ( NumberScalar :: UInt8 ( val) ) => * val as u64 ,
141
+ Scalar :: String ( val) => val. parse :: < u64 > ( ) ?,
142
+ _ => {
143
+ return Err ( ErrorCode :: BadArguments ( format ! (
144
+ "invalid value {:?} expect to be unsigned integer literal." ,
145
+ args[ 0 ]
146
+ ) ) )
147
+ }
148
+ } ;
149
+ Some ( limit_val)
150
+ }
151
+ _ => {
152
+ return Err ( ErrorCode :: NumberArgumentsNotMatch (
153
+ "Expected 0 or 1 arguments" . to_string ( ) ,
154
+ ) ) ;
155
+ }
156
+ } ;
157
+ Ok ( Self { limit } )
99
158
}
100
159
}
0 commit comments