Skip to content

Commit 33c2dd0

Browse files
authored
expire_ref can now edit snapshot pointed by refs (#870)
This is a small deviation from the design document. But there is no objective reason why not to do this. By looking at the tests cases you'll see this expiration algorithm is more aggressive, it frees more snapshots, but we believe it's sound. This is
1 parent 52289bd commit 33c2dd0

File tree

4 files changed

+126
-52
lines changed

4 files changed

+126
-52
lines changed

icechunk/src/format/snapshot.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ impl Snapshot {
313313
message: String,
314314
properties: Option<SnapshotProperties>,
315315
mut manifest_files: Vec<ManifestFileInfo>,
316+
flushed_at: Option<DateTime<Utc>>,
316317
sorted_iter: I,
317318
) -> IcechunkResult<Self>
318319
where
@@ -349,7 +350,7 @@ impl Snapshot {
349350

350351
let message = builder.create_string(&message);
351352
let parent_id = parent_id.map(|oid| generated::ObjectId12::new(&oid.0));
352-
let flushed_at = Utc::now().timestamp_micros() as u64;
353+
let flushed_at = flushed_at.unwrap_or_else(Utc::now).timestamp_micros() as u64;
353354
let id = generated::ObjectId12::new(&id.unwrap_or_else(SnapshotId::random).0);
354355

355356
let nodes: Vec<_> = sorted_iter
@@ -387,6 +388,7 @@ impl Snapshot {
387388
Self::INITIAL_COMMIT_MESSAGE.to_string(),
388389
Some(properties),
389390
Default::default(),
391+
None,
390392
nodes,
391393
)
392394
}
@@ -459,6 +461,7 @@ impl Snapshot {
459461
new_child.message().clone(),
460462
Some(new_child.metadata()?.clone()),
461463
new_child.manifest_files().collect(),
464+
Some(new_child.flushed_at()?),
462465
new_child.iter(),
463466
)
464467
}
@@ -736,6 +739,7 @@ mod tests {
736739
String::default(),
737740
Default::default(),
738741
manifests,
742+
None,
739743
nodes.into_iter().map(Ok::<NodeSnapshot, Infallible>),
740744
)
741745
.unwrap();

icechunk/src/ops/gc.rs

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,12 @@ async fn gc_transaction_logs(
344344

345345
#[derive(Debug, PartialEq, Eq, Clone)]
346346
pub enum ExpireRefResult {
347-
RefIsExpired,
348347
NothingToDo,
349-
Done { released_snapshots: HashSet<SnapshotId>, edited_snapshot: SnapshotId },
348+
Done {
349+
released_snapshots: HashSet<SnapshotId>,
350+
edited_snapshot: SnapshotId,
351+
ref_is_expired: bool,
352+
},
350353
}
351354

352355
/// Expire snapshots older than a threshold.
@@ -394,10 +397,10 @@ pub async fn expire_ref(
394397

395398
pin!(ancestry);
396399

397-
// If we point to an expired snapshot already, there is nothing to do
400+
let mut ref_is_expired = false;
398401
if let Some(Ok(info)) = ancestry.as_mut().peek().await {
399402
if info.flushed_at < older_than {
400-
return Ok(ExpireRefResult::RefIsExpired);
403+
ref_is_expired = true;
401404
}
402405
}
403406

@@ -434,15 +437,14 @@ pub async fn expire_ref(
434437
// and, we only set a root as parent
435438
assert!(root.parent_id().is_none());
436439

437-
assert!(editable_snap.flushed_at()? >= older_than);
438-
439440
// TODO: add properties to the snapshot that tell us it was history edited
440441
let new_snapshot = Arc::new(root.adopt(&editable_snap)?);
441442
asset_manager.write_snapshot(new_snapshot).await?;
442443

443444
Ok(ExpireRefResult::Done {
444445
released_snapshots: released,
445446
edited_snapshot: editable_snap.id().clone(),
447+
ref_is_expired,
446448
})
447449
}
448450

@@ -500,33 +502,41 @@ pub async fn expire(
500502
})
501503
.try_fold(ExpireResult::default(), |mut result, (r, ref_result)| async move {
502504
match ref_result {
503-
ExpireRefResult::Done { released_snapshots, edited_snapshot } => {
505+
ExpireRefResult::Done {
506+
released_snapshots,
507+
edited_snapshot,
508+
ref_is_expired,
509+
} => {
504510
result.released_snapshots.extend(released_snapshots.into_iter());
505511
result.edited_snapshots.insert(edited_snapshot);
506-
Ok(result)
507-
}
508-
ExpireRefResult::RefIsExpired => match &r {
509-
Ref::Tag(name) => {
510-
if expired_tags == ExpiredRefAction::Delete {
511-
delete_tag(storage, storage_settings, name.as_str())
512-
.await
513-
.map_err(GCError::Ref)?;
514-
result.deleted_refs.insert(r);
512+
if ref_is_expired {
513+
match &r {
514+
Ref::Tag(name) => {
515+
if expired_tags == ExpiredRefAction::Delete {
516+
delete_tag(storage, storage_settings, name.as_str())
517+
.await
518+
.map_err(GCError::Ref)?;
519+
result.deleted_refs.insert(r);
520+
}
521+
}
522+
Ref::Branch(name) => {
523+
if expired_branches == ExpiredRefAction::Delete
524+
&& name != Ref::DEFAULT_BRANCH
525+
{
526+
delete_branch(
527+
storage,
528+
storage_settings,
529+
name.as_str(),
530+
)
531+
.await
532+
.map_err(GCError::Ref)?;
533+
result.deleted_refs.insert(r);
534+
}
535+
}
515536
}
516-
Ok(result)
517537
}
518-
Ref::Branch(name) => {
519-
if expired_branches == ExpiredRefAction::Delete
520-
&& name != Ref::DEFAULT_BRANCH
521-
{
522-
delete_branch(storage, storage_settings, name.as_str())
523-
.await
524-
.map_err(GCError::Ref)?;
525-
result.deleted_refs.insert(r);
526-
}
527-
Ok(result)
528-
}
529-
},
538+
Ok(result)
539+
}
530540
ExpireRefResult::NothingToDo => Ok(result),
531541
}
532542
})

icechunk/src/session.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,6 +1576,7 @@ async fn flush(
15761576
message.to_string(),
15771577
Some(properties),
15781578
flush_data.manifest_files.into_iter().collect(),
1579+
None,
15791580
all_nodes.into_iter().map(Ok::<_, Infallible>),
15801581
)?;
15811582

@@ -2080,6 +2081,7 @@ mod tests {
20802081
"message".to_string(),
20812082
None,
20822083
manifests,
2084+
None,
20832085
nodes.iter().cloned().map(Ok::<NodeSnapshot, Infallible>),
20842086
)?);
20852087
asset_manager.write_snapshot(Arc::clone(&snapshot)).await?;

icechunk/tests/test_gc.rs

Lines changed: 80 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -293,14 +293,12 @@ pub async fn test_expire_ref() -> Result<(), Box<dyn std::error::Error>> {
293293
)
294294
.await?
295295
{
296-
ExpireRefResult::RefIsExpired => {
297-
panic!()
298-
}
299296
ExpireRefResult::NothingToDo => {
300297
panic!()
301298
}
302-
ExpireRefResult::Done { released_snapshots, .. } => {
299+
ExpireRefResult::Done { released_snapshots, ref_is_expired, .. } => {
303300
assert_eq!(released_snapshots.len(), 4);
301+
assert!(!ref_is_expired);
304302
}
305303
}
306304

@@ -332,9 +330,9 @@ pub async fn test_expire_ref() -> Result<(), Box<dyn std::error::Error>> {
332330
)
333331
.await?
334332
{
335-
ExpireRefResult::RefIsExpired => panic!(),
336333
ExpireRefResult::NothingToDo => panic!(),
337-
ExpireRefResult::Done { released_snapshots, .. } => {
334+
ExpireRefResult::Done { released_snapshots, ref_is_expired, .. } => {
335+
assert!(!ref_is_expired);
338336
assert_eq!(released_snapshots.len(), 4);
339337
}
340338
}
@@ -367,9 +365,9 @@ pub async fn test_expire_ref() -> Result<(), Box<dyn std::error::Error>> {
367365
)
368366
.await?
369367
{
370-
ExpireRefResult::RefIsExpired => panic!(),
371368
ExpireRefResult::NothingToDo => panic!(),
372-
ExpireRefResult::Done { released_snapshots, .. } => {
369+
ExpireRefResult::Done { released_snapshots, ref_is_expired, .. } => {
370+
assert!(!ref_is_expired);
373371
assert_eq!(released_snapshots.len(), 5);
374372
}
375373
}
@@ -402,9 +400,9 @@ pub async fn test_expire_ref() -> Result<(), Box<dyn std::error::Error>> {
402400
)
403401
.await?
404402
{
405-
ExpireRefResult::RefIsExpired => panic!(),
406403
ExpireRefResult::NothingToDo => panic!(),
407-
ExpireRefResult::Done { released_snapshots, .. } => {
404+
ExpireRefResult::Done { released_snapshots, ref_is_expired, .. } => {
405+
assert!(!ref_is_expired);
408406
assert_eq!(released_snapshots.len(), 5);
409407
}
410408
}
@@ -432,8 +430,8 @@ pub async fn test_expire_ref() -> Result<(), Box<dyn std::error::Error>> {
432430
}
433431

434432
#[tokio::test]
435-
pub async fn test_expire_ref_with_odd_timestamp() -> Result<(), Box<dyn std::error::Error>>
436-
{
433+
pub async fn test_expire_ref_with_odd_timestamps()
434+
-> Result<(), Box<dyn std::error::Error>> {
437435
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
438436
let storage_settings = storage.default_settings();
439437
let repo = Repository::create(None, Arc::clone(&storage), HashMap::new()).await?;
@@ -489,15 +487,24 @@ pub async fn test_expire_ref_with_odd_timestamp() -> Result<(), Box<dyn std::err
489487
)
490488
.await?
491489
{
492-
ExpireRefResult::RefIsExpired => {}
490+
ExpireRefResult::Done { ref_is_expired, .. } => {
491+
assert!(ref_is_expired);
492+
// create another repo to avoid caching issues
493+
let repo =
494+
Repository::open(None, Arc::clone(&storage), HashMap::new()).await?;
495+
assert_eq!(
496+
branch_commit_messages(&repo, "main").await,
497+
Vec::from(["third", "Repository initialized"])
498+
);
499+
}
493500
_ => panic!(),
494501
}
495502

496503
// create another repo to avoid caching issues
497504
let repo = Repository::open(None, Arc::clone(&storage), HashMap::new()).await?;
498505
assert_eq!(
499506
branch_commit_messages(&repo, "main").await,
500-
Vec::from(["third", "second", "first", "Repository initialized"])
507+
Vec::from(["third", "Repository initialized"])
501508
);
502509
Ok(())
503510
}
@@ -552,11 +559,11 @@ pub async fn test_expire_and_garbage_collect() -> Result<(), Box<dyn std::error:
552559
);
553560
assert_eq!(
554561
tag_commit_messages(&repo, "tag1").await,
555-
Vec::from(["3", "2", "1", "Repository initialized"])
562+
Vec::from(["3", "Repository initialized"])
556563
);
557564
assert_eq!(
558565
tag_commit_messages(&repo, "tag2").await,
559-
Vec::from(["5", "4", "2", "1", "Repository initialized"])
566+
Vec::from(["5", "Repository initialized"])
560567
);
561568

562569
let now = Utc::now();
@@ -575,10 +582,10 @@ pub async fn test_expire_and_garbage_collect() -> Result<(), Box<dyn std::error:
575582
)
576583
.await?;
577584
// other expired snapshots are pointed by tags
578-
assert_eq!(summary.snapshots_deleted, 2);
585+
assert_eq!(summary.snapshots_deleted, 5);
579586

580-
// the non expired snapshots + the expired but pointed by tags snapshots
581-
assert_eq!(storage.list_snapshots(&storage_settings).await?.count().await, 13);
587+
// the non expired snapshots + the 2 expired but pointed by tags snapshots
588+
assert_eq!(storage.list_snapshots(&storage_settings).await?.count().await, 10);
582589

583590
repo.delete_tag("tag1").await?;
584591

@@ -592,8 +599,8 @@ pub async fn test_expire_and_garbage_collect() -> Result<(), Box<dyn std::error:
592599
// other expired snapshots are pointed by tag2
593600
assert_eq!(summary.snapshots_deleted, 1);
594601

595-
// the non expired snapshots + the expired but pointed by tags snapshots
596-
assert_eq!(storage.list_snapshots(&storage_settings).await?.count().await, 12);
602+
// the non expired snapshots + the 1 pointed by tag2 snapshots
603+
assert_eq!(storage.list_snapshots(&storage_settings).await?.count().await, 9);
597604

598605
repo.delete_tag("tag2").await?;
599606

@@ -605,10 +612,61 @@ pub async fn test_expire_and_garbage_collect() -> Result<(), Box<dyn std::error:
605612
)
606613
.await?;
607614
// tag2 snapshosts are released now
608-
assert_eq!(summary.snapshots_deleted, 4);
615+
assert_eq!(summary.snapshots_deleted, 1);
609616

610617
// only the non expired snapshots left
611618
assert_eq!(storage.list_snapshots(&storage_settings).await?.count().await, 8);
612619

613620
Ok(())
614621
}
622+
623+
#[tokio::test]
624+
/// In this test, we set up a repo as in the design document for expiration.
625+
///
626+
/// We then, expire old snapshots and garbage collect. We verify we end up
627+
/// with what is expected according to the design document.
628+
pub async fn test_expire_and_garbage_collect_deliting_expired_refs()
629+
-> Result<(), Box<dyn std::error::Error>> {
630+
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
631+
let storage_settings = storage.default_settings();
632+
let mut repo = Repository::create(None, Arc::clone(&storage), HashMap::new()).await?;
633+
634+
let expire_older_than = make_design_doc_repo(&mut repo).await?;
635+
636+
let asset_manager = Arc::new(AssetManager::new_no_cache(
637+
storage.clone(),
638+
storage_settings.clone(),
639+
1,
640+
));
641+
642+
let result = expire(
643+
storage.as_ref(),
644+
&storage_settings,
645+
asset_manager.clone(),
646+
expire_older_than,
647+
// This is different compared to the previous test
648+
ExpiredRefAction::Delete,
649+
ExpiredRefAction::Delete,
650+
)
651+
.await?;
652+
653+
assert_eq!(result.released_snapshots.len(), 7);
654+
assert_eq!(result.deleted_refs.len(), 2);
655+
656+
let now = Utc::now();
657+
let gc_config = GCConfig::clean_all(now, now, None);
658+
let summary = garbage_collect(
659+
storage.as_ref(),
660+
&storage_settings,
661+
asset_manager.clone(),
662+
&gc_config,
663+
)
664+
.await?;
665+
666+
assert_eq!(summary.snapshots_deleted, 7);
667+
assert_eq!(summary.transaction_logs_deleted, 7);
668+
669+
// only the non expired snapshots left
670+
assert_eq!(storage.list_snapshots(&storage_settings).await?.count().await, 8);
671+
Ok(())
672+
}

0 commit comments

Comments
 (0)