@@ -6,11 +6,9 @@ use crate::CubeError;
6
6
use async_trait:: async_trait;
7
7
use datafusion:: cube_ext;
8
8
use futures:: future:: join_all;
9
- use itertools:: Itertools ;
10
9
use log:: { error, info} ;
11
10
use regex:: Regex ;
12
- use std:: collections:: BTreeSet ;
13
- use std:: collections:: HashSet ;
11
+ use std:: collections:: { BTreeSet , HashMap } ;
14
12
use std:: path:: { Path , PathBuf } ;
15
13
use std:: str:: FromStr ;
16
14
use std:: sync:: Arc ;
@@ -57,6 +55,7 @@ pub struct BaseRocksStoreFs {
57
55
name : & ' static str ,
58
56
minimum_snapshots_count : u64 ,
59
57
snapshots_lifetime : u64 ,
58
+ remote_files_cleanup_batch_size : u64 ,
60
59
}
61
60
62
61
impl BaseRocksStoreFs {
@@ -66,11 +65,13 @@ impl BaseRocksStoreFs {
66
65
) -> Arc < Self > {
67
66
let minimum_snapshots_count = config. minimum_metastore_snapshots_count ( ) ;
68
67
let snapshots_lifetime = config. metastore_snapshots_lifetime ( ) ;
68
+ let remote_files_cleanup_batch_size = config. remote_files_cleanup_batch_size ( ) ;
69
69
Arc :: new ( Self {
70
70
remote_fs,
71
71
name : "metastore" ,
72
72
minimum_snapshots_count,
73
73
snapshots_lifetime,
74
+ remote_files_cleanup_batch_size,
74
75
} )
75
76
}
76
77
pub fn new_for_cachestore (
@@ -79,11 +80,13 @@ impl BaseRocksStoreFs {
79
80
) -> Arc < Self > {
80
81
let minimum_snapshots_count = config. minimum_cachestore_snapshots_count ( ) ;
81
82
let snapshots_lifetime = config. cachestore_snapshots_lifetime ( ) ;
83
+ let remote_files_cleanup_batch_size = config. remote_files_cleanup_batch_size ( ) ;
82
84
Arc :: new ( Self {
83
85
remote_fs,
84
86
name : "cachestore" ,
85
87
minimum_snapshots_count,
86
88
snapshots_lifetime,
89
+ remote_files_cleanup_batch_size,
87
90
} )
88
91
}
89
92
@@ -135,63 +138,89 @@ impl BaseRocksStoreFs {
135
138
136
139
Ok ( upload_results)
137
140
}
141
+
142
+ // Exposed for tests
143
+ pub async fn list_files_by_snapshot (
144
+ remote_fs : & dyn RemoteFs ,
145
+ name : & str ,
146
+ ) -> Result < HashMap < u128 , Vec < String > > , CubeError > {
147
+ let existing_metastore_files = remote_fs. list ( format ! ( "{}-" , name) ) . await ?;
148
+ // Log a debug statement so that we can rule out the filename list itself being too large for memory.
149
+ log:: debug!(
150
+ "Listed existing {} files, count = {}" ,
151
+ name,
152
+ existing_metastore_files. len( )
153
+ ) ;
154
+ let mut snapshot_map = HashMap :: < u128 , Vec < String > > :: new ( ) ;
155
+ for existing in existing_metastore_files. into_iter ( ) {
156
+ let path = existing. split ( "/" ) . nth ( 0 ) . map ( |p| {
157
+ u128:: from_str (
158
+ & p. replace ( & format ! ( "{}-" , name) , "" )
159
+ . replace ( "-index-logs" , "" )
160
+ . replace ( "-logs" , "" ) ,
161
+ )
162
+ } ) ;
163
+ if let Some ( Ok ( millis) ) = path {
164
+ snapshot_map
165
+ . entry ( millis)
166
+ . or_insert ( Vec :: new ( ) )
167
+ . push ( existing) ;
168
+ }
169
+ }
170
+ Ok ( snapshot_map)
171
+ }
172
+
138
173
pub async fn delete_old_snapshots ( & self ) -> Result < Vec < String > , CubeError > {
139
- let existing_metastore_files = self . remote_fs . list ( format ! ( "{}-" , self . name) ) . await ?;
140
- let candidates = existing_metastore_files
141
- . iter ( )
142
- . filter_map ( |existing| {
143
- let path = existing. split ( "/" ) . nth ( 0 ) . map ( |p| {
144
- u128:: from_str (
145
- & p. replace ( & format ! ( "{}-" , self . name) , "" )
146
- . replace ( "-index-logs" , "" )
147
- . replace ( "-logs" , "" ) ,
148
- )
149
- } ) ;
150
- if let Some ( Ok ( millis) ) = path {
151
- Some ( ( existing, millis) )
152
- } else {
153
- None
154
- }
155
- } )
156
- . collect :: < Vec < _ > > ( ) ;
174
+ let candidates_map =
175
+ Self :: list_files_by_snapshot ( self . remote_fs . as_ref ( ) , & self . name ) . await ?;
157
176
158
177
let lifetime_ms = ( self . snapshots_lifetime as u128 ) * 1000 ;
159
178
let min_snapshots_count = self . minimum_snapshots_count as usize ;
160
179
161
- let mut snapshots_list = candidates
162
- . iter ( )
163
- . map ( |( _, ms) | ms. to_owned ( ) )
164
- . unique ( )
165
- . collect :: < Vec < _ > > ( ) ;
166
- snapshots_list. sort_unstable_by ( |a, b| b. cmp ( a) ) ;
180
+ // snapshots_list sorted by oldest first.
181
+ let mut snapshots_list: Vec < u128 > = candidates_map. keys ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
182
+ snapshots_list. sort_unstable ( ) ;
167
183
168
- let snapshots_to_delete = snapshots_list
169
- . into_iter ( )
170
- . skip ( min_snapshots_count)
171
- . filter ( |ms| {
172
- SystemTime :: now ( )
173
- . duration_since ( SystemTime :: UNIX_EPOCH )
174
- . unwrap ( )
175
- . as_millis ( )
176
- - ms
177
- > lifetime_ms
178
- } )
179
- . collect :: < HashSet < _ > > ( ) ;
184
+ if snapshots_list. len ( ) <= min_snapshots_count {
185
+ return Ok ( vec ! [ ] ) ;
186
+ }
187
+ snapshots_list. truncate ( snapshots_list. len ( ) - min_snapshots_count) ;
180
188
181
- if !snapshots_to_delete. is_empty ( ) {
182
- let to_delete = candidates
183
- . into_iter ( )
184
- . filter_map ( |( path, ms) | {
185
- if snapshots_to_delete. contains ( & ms) {
186
- Some ( path. to_owned ( ) )
187
- } else {
188
- None
189
- }
190
- } )
191
- . unique ( )
192
- . collect :: < Vec < _ > > ( ) ;
189
+ let cutoff_time_ms: u128 = SystemTime :: now ( )
190
+ . duration_since ( SystemTime :: UNIX_EPOCH )
191
+ . unwrap ( )
192
+ . as_millis ( )
193
+ - lifetime_ms;
194
+
195
+ while !snapshots_list. is_empty ( ) && * snapshots_list. last ( ) . unwrap ( ) >= cutoff_time_ms {
196
+ snapshots_list. pop ( ) ;
197
+ }
198
+
199
+ let snapshots_list = snapshots_list;
200
+
201
+ if snapshots_list. is_empty ( ) {
202
+ // Avoid empty join_all, iteration, etc.
203
+ return Ok ( vec ! [ ] ) ;
204
+ }
205
+
206
+ let mut to_delete: Vec < String > = Vec :: new ( ) ;
207
+
208
+ let mut candidates_map = candidates_map;
209
+ for ms in snapshots_list {
210
+ to_delete. append (
211
+ candidates_map
212
+ . get_mut ( & ms)
213
+ . expect ( "delete_old_snapshots candidates_map lookup should succeed" ) ,
214
+ ) ;
215
+ }
216
+
217
+ for batch in to_delete. chunks (
218
+ self . remote_files_cleanup_batch_size
219
+ . try_into ( )
220
+ . unwrap_or ( usize:: MAX ) ,
221
+ ) {
193
222
for v in join_all (
194
- to_delete
223
+ batch
195
224
. iter ( )
196
225
. map ( |f| self . remote_fs . delete_file ( f. to_string ( ) ) )
197
226
. collect :: < Vec < _ > > ( ) ,
@@ -201,11 +230,9 @@ impl BaseRocksStoreFs {
201
230
{
202
231
v?;
203
232
}
204
-
205
- Ok ( to_delete)
206
- } else {
207
- Ok ( vec ! [ ] )
208
233
}
234
+
235
+ Ok ( to_delete)
209
236
}
210
237
211
238
pub async fn is_remote_metadata_exists ( & self ) -> Result < bool , CubeError > {
@@ -367,10 +394,10 @@ impl MetaStoreFs for BaseRocksStoreFs {
367
394
self . upload_snapsots_files ( & remote_path, & checkpoint_path)
368
395
. await ?;
369
396
370
- self . delete_old_snapshots ( ) . await ?;
371
-
372
397
self . write_metastore_current ( & remote_path) . await ?;
373
398
399
+ self . delete_old_snapshots ( ) . await ?;
400
+
374
401
Ok ( ( ) )
375
402
}
376
403
async fn load_metastore_logs (
0 commit comments