From 38eac109d18607d6b822e1b1cedbe8ea6e2fdebb Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 4 Jul 2025 15:45:59 -0400 Subject: [PATCH 1/3] feat: `refs` in `inspect` --- crates/iceberg/src/inspect/metadata_table.rs | 5 + crates/iceberg/src/inspect/mod.rs | 2 + crates/iceberg/src/inspect/refs.rs | 189 +++++++++++++++++++ 3 files changed, 196 insertions(+) create mode 100644 crates/iceberg/src/inspect/refs.rs diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs index b2dfbb7b12..2bc153a62b 100644 --- a/crates/iceberg/src/inspect/metadata_table.rs +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -42,6 +42,11 @@ impl<'a> MetadataTable<'a> { pub fn manifests(&self) -> ManifestsTable { ManifestsTable::new(self.0) } + + /// Get the refs table. + pub fn refs(&self) -> crate::inspect::RefsTable { + crate::inspect::RefsTable::new(self.0) + } } #[cfg(test)] diff --git a/crates/iceberg/src/inspect/mod.rs b/crates/iceberg/src/inspect/mod.rs index b64420ea11..f531c5f0af 100644 --- a/crates/iceberg/src/inspect/mod.rs +++ b/crates/iceberg/src/inspect/mod.rs @@ -19,8 +19,10 @@ mod manifests; mod metadata_table; +mod refs; mod snapshots; pub use manifests::ManifestsTable; pub use metadata_table::*; +pub use refs::RefsTable; pub use snapshots::SnapshotsTable; diff --git a/crates/iceberg/src/inspect/refs.rs b/crates/iceberg/src/inspect/refs.rs new file mode 100644 index 0000000000..60219def0f --- /dev/null +++ b/crates/iceberg/src/inspect/refs.rs @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_array::builder::{PrimitiveBuilder, StringBuilder}; +use arrow_array::types::{Int32Type, Int64Type}; +use futures::{StreamExt, stream}; + +use crate::Result; +use crate::arrow::schema_to_arrow_schema; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::{NestedField, PrimitiveType, SnapshotReference, SnapshotRetention, Type}; +use crate::table::Table; + +/// Refs table. +pub struct RefsTable<'a> { + table: &'a Table, +} + +impl<'a> RefsTable<'a> { + /// Create a new Refs table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the refs table. + pub fn schema(&self) -> crate::spec::Schema { + let fields = vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)), + NestedField::required(2, "type", Type::Primitive(PrimitiveType::String)), + NestedField::required(3, "snapshot_id", Type::Primitive(PrimitiveType::Long)), + NestedField::optional( + 4, + "max_reference_age_in_ms", + Type::Primitive(PrimitiveType::Long), + ), + NestedField::optional( + 5, + "min_snapshots_to_keep", + Type::Primitive(PrimitiveType::Int), + ), + NestedField::optional( + 6, + "max_snapshot_age_in_ms", + Type::Primitive(PrimitiveType::Long), + ), + ]; + crate::spec::Schema::builder() + .with_fields(fields.into_iter().map(|f| f.into())) + .build() + .unwrap() + } + + /// Scans the refs table. + pub async fn scan(&self) -> Result { + let schema = schema_to_arrow_schema(&self.schema())?; + + let mut names = StringBuilder::new(); + let mut ref_type = StringBuilder::new(); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut max_reference_age_in_ms = PrimitiveBuilder::::new(); + let mut min_keep = PrimitiveBuilder::::new(); + let mut max_snapshot_age_in_ms = PrimitiveBuilder::::new(); + + let refs: &HashMap = &self.table.metadata().refs; + for (name, snapshot_ref) in refs { + names.append_value(name); + snapshot_id.append_value(snapshot_ref.snapshot_id); + + match &snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms, + max_ref_age_ms, + } => { + ref_type.append_value("BRANCH"); + max_reference_age_in_ms.append_option(*max_ref_age_ms); + min_keep.append_option(*min_snapshots_to_keep); + max_snapshot_age_in_ms.append_option(*max_snapshot_age_ms); + } + SnapshotRetention::Tag { max_ref_age_ms } => { + ref_type.append_value("TAG"); + max_reference_age_in_ms.append_option(*max_ref_age_ms); + min_keep.append_null(); + max_snapshot_age_in_ms.append_null(); + } + } + } + + let batch = RecordBatch::try_new(Arc::new(schema), vec![ + Arc::new(names.finish()), + Arc::new(ref_type.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(max_reference_age_in_ms.finish()), + Arc::new(min_keep.finish()), + Arc::new(max_snapshot_age_in_ms.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + + use crate::inspect::metadata_table::tests::check_record_batches; + use crate::scan::tests::TableTestFixture; + use crate::spec::{SnapshotReference, SnapshotRetention}; + + #[tokio::test] + async fn test_refs_table() { + let table = TableTestFixture::new().table; + + let snapshot_id = table + .metadata() + .snapshots() + .next() + .map(|s| s.snapshot_id()) + .expect("Table should have at least one snapshot"); + + SnapshotReference::new(snapshot_id, SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: Some(86400000), + max_ref_age_ms: Some(604800000), + }); + + let batch_stream = table.inspect().refs().scan().await.unwrap(); + + check_record_batches( + batch_stream, + expect![[r#"Field { name: "name", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, +Field { name: "type", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, +Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, +Field { name: "max_reference_age_in_ms", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} }, +Field { name: "min_snapshots_to_keep", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} }, +Field { name: "max_snapshot_age_in_ms", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }"#]], + expect![[r#"name: StringArray +[ + "test", + "main", +], +type: StringArray +[ + "TAG", + "BRANCH", +], +snapshot_id: PrimitiveArray +[ + 3051729675574597004, + 3055729675574597004, +], +max_reference_age_in_ms: PrimitiveArray +[ + 10000000, + null, +], +min_snapshots_to_keep: PrimitiveArray +[ + null, + null, +], +max_snapshot_age_in_ms: PrimitiveArray +[ + null, + null, +]"#]], + &[], // No columns to skip initially + None, // No sort column initially + ).await; + } +} From 24c3d00ed7b43d17f0a5390fca1fe2e899104618 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 4 Jul 2025 15:50:42 -0400 Subject: [PATCH 2/3] use correct crate --- crates/iceberg/src/inspect/refs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/inspect/refs.rs b/crates/iceberg/src/inspect/refs.rs index 60219def0f..e8e3a8a943 100644 --- a/crates/iceberg/src/inspect/refs.rs +++ b/crates/iceberg/src/inspect/refs.rs @@ -121,9 +121,9 @@ impl<'a> RefsTable<'a> { mod tests { use expect_test::expect; - use crate::inspect::metadata_table::tests::check_record_batches; use crate::scan::tests::TableTestFixture; use crate::spec::{SnapshotReference, SnapshotRetention}; + use crate::test_utils::check_record_batches; #[tokio::test] async fn test_refs_table() { From 6af21d82a1d3c2789fe7af07a2602d4bcfac80ea Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 4 Jul 2025 15:56:44 -0400 Subject: [PATCH 3/3] fix because I actually read the snapshots table this time --- crates/iceberg/src/inspect/refs.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/inspect/refs.rs b/crates/iceberg/src/inspect/refs.rs index e8e3a8a943..d5b23233fc 100644 --- a/crates/iceberg/src/inspect/refs.rs +++ b/crates/iceberg/src/inspect/refs.rs @@ -120,6 +120,7 @@ impl<'a> RefsTable<'a> { #[cfg(test)] mod tests { use expect_test::expect; + use futures::TryStreamExt; use crate::scan::tests::TableTestFixture; use crate::spec::{SnapshotReference, SnapshotRetention}; @@ -145,14 +146,16 @@ mod tests { let batch_stream = table.inspect().refs().scan().await.unwrap(); check_record_batches( - batch_stream, - expect![[r#"Field { name: "name", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, + batch_stream.try_collect::>().await.unwrap(), + expect![[ + r#"Field { name: "name", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, Field { name: "type", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, Field { name: "max_reference_age_in_ms", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} }, Field { name: "min_snapshots_to_keep", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} }, -Field { name: "max_snapshot_age_in_ms", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }"#]], - expect![[r#"name: StringArray +Field { name: "max_snapshot_age_in_ms", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }"# + ]], + expect![[r#"name: StringArray [ "test", "main", @@ -182,8 +185,8 @@ max_snapshot_age_in_ms: PrimitiveArray null, null, ]"#]], - &[], // No columns to skip initially - None, // No sort column initially - ).await; + &[], // No columns to skip initially + None, // No sort column initially + ); } }