@@ -3,6 +3,7 @@ use std::{collections::HashSet, future::ready, sync::Arc};
33use chrono:: { DateTime , Utc } ;
44use futures:: { StreamExt , TryStreamExt , stream} ;
55use tokio:: pin;
6+ use tracing:: instrument;
67
78use crate :: {
89 Storage , StorageError ,
@@ -152,6 +153,7 @@ pub enum GCError {
152153
153154pub type GCResult < A > = Result < A , GCError > ;
154155
156+ #[ instrument( skip( asset_manager) ) ]
155157pub async fn garbage_collect (
156158 storage : & ( dyn Storage + Send + Sync ) ,
157159 storage_settings : & storage:: Settings ,
@@ -160,9 +162,11 @@ pub async fn garbage_collect(
160162) -> GCResult < GCSummary > {
161163 // TODO: this function could have much more parallelism
162164 if !config. action_needed ( ) {
165+ tracing:: info!( "No action requested" ) ;
163166 return Ok ( GCSummary :: default ( ) ) ;
164167 }
165168
169+ tracing:: info!( "Finding GC roots" ) ;
166170 let all_snaps = pointed_snapshots (
167171 storage,
168172 storage_settings,
@@ -176,15 +180,21 @@ pub async fn garbage_collect(
176180 let mut keep_manifests = HashSet :: new ( ) ;
177181 let mut keep_snapshots = HashSet :: new ( ) ;
178182
183+ tracing:: info!( "Calculating retained objects" ) ;
179184 pin ! ( all_snaps) ;
180185 while let Some ( snap_id) = all_snaps. try_next ( ) . await ? {
181186 let snap = asset_manager. fetch_snapshot ( & snap_id) . await ?;
182- if config. deletes_snapshots ( ) {
183- keep_snapshots . insert ( snap_id) ;
187+ if config. deletes_snapshots ( ) && keep_snapshots . insert ( snap_id . clone ( ) ) {
188+ tracing :: trace! ( "Adding snapshot to keep list: {}" , & snap_id) ;
184189 }
185190
186191 if config. deletes_manifests ( ) {
187- keep_manifests. extend ( snap. manifest_files ( ) . map ( |mf| mf. id ) ) ;
192+ let manifests = snap. manifest_files ( ) . map ( |mf| mf. id ) ;
193+ for mf in manifests {
194+ if keep_manifests. insert ( mf. clone ( ) ) {
195+ tracing:: trace!( "Adding manifest to keep list: {}" , & mf) ;
196+ }
197+ }
188198 }
189199
190200 if config. deletes_chunks ( ) {
@@ -212,7 +222,11 @@ pub async fn garbage_collect(
212222 None
213223 }
214224 } ) ;
215- keep_chunks. extend ( chunk_ids) ;
225+ for chunk in chunk_ids {
226+ if keep_chunks. insert ( chunk. clone ( ) ) {
227+ tracing:: trace!( "Adding chunk to keep list: {}" , & chunk) ;
228+ }
229+ }
216230 }
217231 }
218232 }
@@ -246,6 +260,7 @@ pub async fn garbage_collect(
246260 Ok ( summary)
247261}
248262
263+ #[ instrument( skip( storage, storage_settings, config, keep_ids) , fields( keep_ids. len = keep_ids. len( ) ) ) ]
249264async fn gc_chunks (
250265 storage : & ( dyn Storage + Send + Sync ) ,
251266 storage_settings : & storage:: Settings ,
@@ -269,6 +284,7 @@ async fn gc_chunks(
269284 Ok ( storage. delete_chunks ( storage_settings, to_delete) . await ?)
270285}
271286
287+ #[ instrument( skip( storage, storage_settings, config, keep_ids) , fields( keep_ids. len = keep_ids. len( ) ) ) ]
272288async fn gc_manifests (
273289 storage : & ( dyn Storage + Send + Sync ) ,
274290 storage_settings : & storage:: Settings ,
@@ -294,6 +310,7 @@ async fn gc_manifests(
294310 Ok ( storage. delete_manifests ( storage_settings, to_delete) . await ?)
295311}
296312
313+ #[ instrument( skip( storage, storage_settings, config, keep_ids) , fields( keep_ids. len = keep_ids. len( ) ) ) ]
297314async fn gc_snapshots (
298315 storage : & ( dyn Storage + Send + Sync ) ,
299316 storage_settings : & storage:: Settings ,
@@ -319,6 +336,7 @@ async fn gc_snapshots(
319336 Ok ( storage. delete_snapshots ( storage_settings, to_delete) . await ?)
320337}
321338
339+ #[ instrument( skip( storage, storage_settings, config, keep_ids) , fields( keep_ids. len = keep_ids. len( ) ) ) ]
322340async fn gc_transaction_logs (
323341 storage : & ( dyn Storage + Send + Sync ) ,
324342 storage_settings : & storage:: Settings ,
@@ -373,6 +391,7 @@ pub enum ExpireRefResult {
373391/// ether refs.
374392///
375393/// See: https://github.com/earth-mover/icechunk/blob/main/design-docs/007-basic-expiration.md
394+ #[ instrument( skip( asset_manager) ) ]
376395pub async fn expire_ref (
377396 storage : & ( dyn Storage + Send + Sync ) ,
378397 storage_settings : & storage:: Settings ,
@@ -385,6 +404,8 @@ pub async fn expire_ref(
385404 . await
386405 . map ( |ref_data| ref_data. snapshot ) ?;
387406
407+ tracing:: info!( "Starting expiration at ref {}" , snap_id) ;
408+
388409 // the algorithm works by finding the oldest non expired snap and the root of the repo
389410 // we do that in a single pass through the ancestry
390411 // we keep two "pointer" the last editable_snap and the root, and we keep
@@ -402,15 +423,18 @@ pub async fn expire_ref(
402423 let mut ref_is_expired = false ;
403424 if let Some ( Ok ( info) ) = ancestry. as_mut ( ) . peek ( ) . await {
404425 if info. flushed_at < older_than {
426+ tracing:: debug!( flushed_at = %info. flushed_at, "Ref flagged as expired" ) ;
405427 ref_is_expired = true ;
406428 }
407429 }
408430
409431 while let Some ( parent) = ancestry. try_next ( ) . await ? {
410432 if parent. flushed_at >= older_than {
433+ tracing:: debug!( snap = %parent. id, flushed_at = %parent. flushed_at, "Processing non expired snapshot" ) ;
411434 // we are navigating non-expired snaps, last will be kept in editable_snap
412435 editable_snap = parent. id ;
413436 } else {
437+ tracing:: debug!( snap = %parent. id, flushed_at = %parent. flushed_at, "Processing expired snapshot" ) ;
414438 released. insert ( parent. id . clone ( ) ) ;
415439 root = parent. id ;
416440 }
@@ -429,6 +453,7 @@ pub async fn expire_ref(
429453 {
430454 // Either the reference is the root, or it is pointing to the root as first parent
431455 // Nothing to do
456+ tracing:: info!( "Nothing to expire for this ref" ) ;
432457 return Ok ( ExpireRefResult :: NothingToDo { ref_is_expired } ) ;
433458 }
434459
@@ -439,9 +464,12 @@ pub async fn expire_ref(
439464 // and, we only set a root as parent
440465 assert ! ( root. parent_id( ) . is_none( ) ) ;
441466
467+ tracing:: info!( root = %root. id( ) , editable_snap=%editable_snap. id( ) , "Expiration needed for this ref" ) ;
468+
442469 // TODO: add properties to the snapshot that tell us it was history edited
443470 let new_snapshot = Arc :: new ( root. adopt ( & editable_snap) ?) ;
444471 asset_manager. write_snapshot ( new_snapshot) . await ?;
472+ tracing:: info!( "Snapshot overwritten" ) ;
445473
446474 Ok ( ExpireRefResult :: Done {
447475 released_snapshots : released,
@@ -481,6 +509,7 @@ pub struct ExpireResult {
481509/// ether refs.
482510///
483511/// See: https://github.com/earth-mover/icechunk/blob/main/design-docs/007-basic-expiration.md
512+ #[ instrument( skip( asset_manager) ) ]
484513pub async fn expire (
485514 storage : & ( dyn Storage + Send + Sync ) ,
486515 storage_settings : & storage:: Settings ,
@@ -519,6 +548,7 @@ pub async fn expire(
519548 match & r {
520549 Ref :: Tag ( name) => {
521550 if expired_tags == ExpiredRefAction :: Delete {
551+ tracing:: info!( name, "Deleting expired tag" ) ;
522552 delete_tag ( storage, storage_settings, name. as_str ( ) )
523553 . await
524554 . map_err ( GCError :: Ref ) ?;
@@ -529,6 +559,7 @@ pub async fn expire(
529559 if expired_branches == ExpiredRefAction :: Delete
530560 && name != Ref :: DEFAULT_BRANCH
531561 {
562+ tracing:: info!( name, "Deleting expired branch" ) ;
532563 delete_branch ( storage, storage_settings, name. as_str ( ) )
533564 . await
534565 . map_err ( GCError :: Ref ) ?;
0 commit comments