diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index 8653b22a0..c6e9016cd 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -16,6 +16,7 @@ // under the License. use fnv::FnvHashSet; +use serde_bytes::ByteBuf; use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit}; use crate::expr::{BoundPredicate, BoundReference}; @@ -42,13 +43,13 @@ impl ManifestEvaluator { /// see if this `ManifestFile` could possibly contain data that matches /// the scan's filter. pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> Result { - if manifest_file.partitions.is_empty() { - return Ok(true); + match &manifest_file.partitions { + Some(p) if !p.is_empty() => { + let mut evaluator = ManifestFilterVisitor::new(p); + visit(&mut evaluator, &self.partition_filter) + } + _ => Ok(true), } - - let mut evaluator = ManifestFilterVisitor::new(&manifest_file.partitions); - - visit(&mut evaluator, &self.partition_filter) } } @@ -154,9 +155,19 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> { _predicate: &BoundPredicate, ) -> crate::Result { let field = self.field_summary_for_reference(reference); + match &field.lower_bound { - Some(bound) if datum <= bound => ROWS_CANNOT_MATCH, - Some(_) => ROWS_MIGHT_MATCH, + Some(bound_bytes) => { + let bound = ManifestFilterVisitor::bytes_to_datum( + bound_bytes, + *reference.field().field_type.clone(), + ); + if datum <= &bound { + ROWS_CANNOT_MATCH + } else { + ROWS_MIGHT_MATCH + } + } None => ROWS_CANNOT_MATCH, } } @@ -169,8 +180,17 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> { ) -> crate::Result { let field = self.field_summary_for_reference(reference); match &field.lower_bound { - Some(bound) if datum < bound => ROWS_CANNOT_MATCH, - Some(_) => ROWS_MIGHT_MATCH, + Some(bound_bytes) => { + let bound = ManifestFilterVisitor::bytes_to_datum( + bound_bytes, + *reference.field().field_type.clone(), + ); + if datum < &bound { + ROWS_CANNOT_MATCH + } else { + ROWS_MIGHT_MATCH + } + } None => ROWS_CANNOT_MATCH, } } @@ -183,8 +203,17 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> { ) -> crate::Result { let field = self.field_summary_for_reference(reference); match &field.upper_bound { - Some(bound) if datum >= bound => ROWS_CANNOT_MATCH, - Some(_) => ROWS_MIGHT_MATCH, + Some(bound_bytes) => { + let bound = ManifestFilterVisitor::bytes_to_datum( + bound_bytes, + *reference.field().field_type.clone(), + ); + if datum >= &bound { + ROWS_CANNOT_MATCH + } else { + ROWS_MIGHT_MATCH + } + } None => ROWS_CANNOT_MATCH, } } @@ -197,8 +226,17 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> { ) -> crate::Result { let field = self.field_summary_for_reference(reference); match &field.upper_bound { - Some(bound) if datum > bound => ROWS_CANNOT_MATCH, - Some(_) => ROWS_MIGHT_MATCH, + Some(bound_bytes) => { + let bound = ManifestFilterVisitor::bytes_to_datum( + bound_bytes, + *reference.field().field_type.clone(), + ); + if datum > &bound { + ROWS_CANNOT_MATCH + } else { + ROWS_MIGHT_MATCH + } + } None => ROWS_CANNOT_MATCH, } } @@ -215,14 +253,22 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> { return ROWS_CANNOT_MATCH; } - if let Some(lower_bound) = &field.lower_bound { - if lower_bound > datum { + if let Some(lower_bound_bytes) = &field.lower_bound { + let lower_bound = ManifestFilterVisitor::bytes_to_datum( + lower_bound_bytes, + *reference.field().field_type.clone(), + ); + if &lower_bound > datum { return ROWS_CANNOT_MATCH; } } - if let Some(upper_bound) = &field.upper_bound { - if upper_bound < datum { + if let Some(upper_bound_bytes) = &field.upper_bound { + let upper_bound = ManifestFilterVisitor::bytes_to_datum( + upper_bound_bytes, + *reference.field().field_type.clone(), + ); + if &upper_bound < datum { return ROWS_CANNOT_MATCH; } } @@ -260,23 +306,15 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> { let prefix_len = prefix.len(); if let Some(lower_bound) = &field.lower_bound { - let lower_bound_str = ManifestFilterVisitor::datum_as_str( - lower_bound, - "Cannot perform starts_with on non-string lower bound", - )?; - let min_len = lower_bound_str.len().min(prefix_len); - if prefix.as_bytes().lt(&lower_bound_str.as_bytes()[..min_len]) { + let min_len = lower_bound.len().min(prefix_len); + if prefix.as_bytes().lt(&lower_bound[..min_len]) { return ROWS_CANNOT_MATCH; } } if let Some(upper_bound) = &field.upper_bound { - let upper_bound_str = ManifestFilterVisitor::datum_as_str( - upper_bound, - "Cannot perform starts_with on non-string upper bound", - )?; - let min_len = upper_bound_str.len().min(prefix_len); - if prefix.as_bytes().gt(&upper_bound_str.as_bytes()[..min_len]) { + let min_len = upper_bound.len().min(prefix_len); + if prefix.as_bytes().gt(&upper_bound[..min_len]) { return ROWS_CANNOT_MATCH; } } @@ -305,35 +343,19 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> { // not_starts_with will match unless all values must start with the prefix. This happens when // the lower and upper bounds both start with the prefix. if let Some(lower_bound) = &field.lower_bound { - let lower_bound_str = ManifestFilterVisitor::datum_as_str( - lower_bound, - "Cannot perform not_starts_with on non-string lower bound", - )?; - // if lower is shorter than the prefix then lower doesn't start with the prefix - if prefix_len > lower_bound_str.len() { + if prefix_len > lower_bound.len() { return ROWS_MIGHT_MATCH; } - if prefix - .as_bytes() - .eq(&lower_bound_str.as_bytes()[..prefix_len]) - { + if prefix.as_bytes().eq(&lower_bound[..prefix_len]) { if let Some(upper_bound) = &field.upper_bound { - let upper_bound_str = ManifestFilterVisitor::datum_as_str( - upper_bound, - "Cannot perform not_starts_with on non-string upper bound", - )?; - // if upper is shorter than the prefix then upper can't start with the prefix - if prefix_len > upper_bound_str.len() { + if prefix_len > upper_bound.len() { return ROWS_MIGHT_MATCH; } - if prefix - .as_bytes() - .eq(&upper_bound_str.as_bytes()[..prefix_len]) - { + if prefix.as_bytes().eq(&upper_bound[..prefix_len]) { return ROWS_CANNOT_MATCH; } } @@ -359,13 +381,21 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> { } if let Some(lower_bound) = &field.lower_bound { - if literals.iter().all(|datum| lower_bound > datum) { + let lower_bound = ManifestFilterVisitor::bytes_to_datum( + lower_bound, + *reference.field().clone().field_type, + ); + if literals.iter().all(|datum| &lower_bound > datum) { return ROWS_CANNOT_MATCH; } } if let Some(upper_bound) = &field.upper_bound { - if literals.iter().all(|datum| upper_bound < datum) { + let upper_bound = ManifestFilterVisitor::bytes_to_datum( + upper_bound, + *reference.field().clone().field_type, + ); + if literals.iter().all(|datum| &upper_bound < datum) { return ROWS_CANNOT_MATCH; } } @@ -414,6 +444,11 @@ impl ManifestFilterVisitor<'_> { }; Ok(bound) } + + fn bytes_to_datum(bytes: &ByteBuf, t: Type) -> Datum { + let p = t.as_primitive_type().unwrap(); + Datum::try_from_bytes(bytes, p.clone()).unwrap() + } } #[cfg(test)] @@ -520,8 +555,8 @@ mod test { FieldSummary { contains_null: false, contains_nan: None, - lower_bound: Some(Datum::int(INT_MIN_VALUE)), - upper_bound: Some(Datum::int(INT_MAX_VALUE)), + lower_bound: Some(Datum::int(INT_MIN_VALUE).to_bytes().unwrap()), + upper_bound: Some(Datum::int(INT_MAX_VALUE).to_bytes().unwrap()), }, // all_nulls_missing_nan FieldSummary { @@ -534,22 +569,22 @@ mod test { FieldSummary { contains_null: true, contains_nan: None, - lower_bound: Some(Datum::string(STRING_MIN_VALUE)), - upper_bound: Some(Datum::string(STRING_MAX_VALUE)), + lower_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()), + upper_bound: Some(Datum::string(STRING_MAX_VALUE).to_bytes().unwrap()), }, // no_nulls FieldSummary { contains_null: false, contains_nan: None, - lower_bound: Some(Datum::string(STRING_MIN_VALUE)), - upper_bound: Some(Datum::string(STRING_MAX_VALUE)), + lower_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()), + upper_bound: Some(Datum::string(STRING_MAX_VALUE).to_bytes().unwrap()), }, // float FieldSummary { contains_null: true, contains_nan: None, - lower_bound: Some(Datum::float(0.0)), - upper_bound: Some(Datum::float(20.0)), + lower_bound: Some(Datum::float(0.0).to_bytes().unwrap()), + upper_bound: Some(Datum::float(20.0).to_bytes().unwrap()), }, // all_nulls_double FieldSummary { @@ -583,8 +618,8 @@ mod test { FieldSummary { contains_null: false, contains_nan: Some(false), - lower_bound: Some(Datum::float(0.0)), - upper_bound: Some(Datum::float(20.0)), + lower_bound: Some(Datum::float(0.0).to_bytes().unwrap()), + upper_bound: Some(Datum::float(20.0).to_bytes().unwrap()), }, // all_nulls_missing_nan_float FieldSummary { @@ -597,15 +632,15 @@ mod test { FieldSummary { contains_null: true, contains_nan: None, - lower_bound: Some(Datum::string(STRING_MIN_VALUE)), - upper_bound: Some(Datum::string(STRING_MIN_VALUE)), + lower_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()), + upper_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()), }, // no_nulls_same_value_a FieldSummary { contains_null: false, contains_nan: None, - lower_bound: Some(Datum::string(STRING_MIN_VALUE)), - upper_bound: Some(Datum::string(STRING_MIN_VALUE)), + lower_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()), + upper_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()), }, ] } @@ -625,7 +660,7 @@ mod test { added_rows_count: None, existing_rows_count: None, deleted_rows_count: None, - partitions, + partitions: Some(partitions), key_metadata: vec![], } } diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index d59d831c7..9fd732eab 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -29,7 +29,7 @@ use futures::{StreamExt, stream}; use crate::Result; use crate::arrow::schema_to_arrow_schema; use crate::scan::ArrowRecordBatchStream; -use crate::spec::{FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type}; +use crate::spec::{Datum, FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type}; use crate::table::Table; /// Manifests table. @@ -181,7 +181,20 @@ impl<'a> ManifestsTable<'a> { .append_value(manifest.existing_files_count.unwrap_or(0) as i32); deleted_delete_files_count .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); - self.append_partition_summaries(&mut partition_summaries, &manifest.partitions); + + let spec = self + .table + .metadata() + .partition_spec_by_id(manifest.partition_spec_id) + .unwrap(); + let spec_struct = spec + .partition_type(self.table.metadata().current_schema()) + .unwrap(); + self.append_partition_summaries( + &mut partition_summaries, + &manifest.partitions.clone().unwrap_or_else(Vec::new), + spec_struct, + ); } } @@ -230,9 +243,10 @@ impl<'a> ManifestsTable<'a> { &self, builder: &mut GenericListBuilder, partitions: &[FieldSummary], + partition_struct: StructType, ) { let partition_summaries_builder = builder.values(); - for summary in partitions { + for (summary, field) in partitions.iter().zip(partition_struct.fields()) { partition_summaries_builder .field_builder::(0) .unwrap() @@ -241,14 +255,23 @@ impl<'a> ManifestsTable<'a> { .field_builder::(1) .unwrap() .append_option(summary.contains_nan); + partition_summaries_builder .field_builder::(2) .unwrap() - .append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); + .append_option(summary.lower_bound.as_ref().map(|v| { + Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone()) + .unwrap() + .to_string() + })); partition_summaries_builder .field_builder::(3) .unwrap() - .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); + .append_option(summary.upper_bound.as_ref().map(|v| { + Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone()) + .unwrap() + .to_string() + })); partition_summaries_builder.append(true); } builder.append(true); diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index 15342f667..3e3cb03f8 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -627,14 +627,15 @@ mod tests { writer.add_entry(entry.clone()).unwrap(); } let manifest_file = writer.write_manifest_file().await.unwrap(); - assert_eq!(manifest_file.partitions.len(), 1); + let partitions = manifest_file.partitions.unwrap(); + assert_eq!(partitions.len(), 1); assert_eq!( - manifest_file.partitions[0].lower_bound, - Some(Datum::string("x")) + partitions[0].clone().lower_bound.unwrap(), + Datum::string("x").to_bytes().unwrap() ); assert_eq!( - manifest_file.partitions[0].upper_bound, - Some(Datum::string("x")) + partitions[0].clone().upper_bound.unwrap(), + Datum::string("x").to_bytes().unwrap() ); // read back the manifest file and check the content @@ -1014,20 +1015,40 @@ mod tests { } let res = writer.write_manifest_file().await.unwrap(); - assert_eq!(res.partitions.len(), 3); - assert_eq!(res.partitions[0].lower_bound, Some(Datum::int(1111))); - assert_eq!(res.partitions[0].upper_bound, Some(Datum::int(2021))); - assert!(!res.partitions[0].contains_null); - assert_eq!(res.partitions[0].contains_nan, Some(false)); + let partitions = res.partitions.unwrap(); - assert_eq!(res.partitions[1].lower_bound, Some(Datum::float(1.0))); - assert_eq!(res.partitions[1].upper_bound, Some(Datum::float(15.5))); - assert!(res.partitions[1].contains_null); - assert_eq!(res.partitions[1].contains_nan, Some(true)); + assert_eq!(partitions.len(), 3); + assert_eq!( + partitions[0].clone().lower_bound.unwrap(), + Datum::int(1111).to_bytes().unwrap() + ); + assert_eq!( + partitions[0].clone().upper_bound.unwrap(), + Datum::int(2021).to_bytes().unwrap() + ); + assert!(!partitions[0].clone().contains_null); + assert_eq!(partitions[0].clone().contains_nan, Some(false)); - assert_eq!(res.partitions[2].lower_bound, Some(Datum::double(1.0))); - assert_eq!(res.partitions[2].upper_bound, Some(Datum::double(25.5))); - assert!(!res.partitions[2].contains_null); - assert_eq!(res.partitions[2].contains_nan, Some(false)); + assert_eq!( + partitions[1].clone().lower_bound.unwrap(), + Datum::float(1.0).to_bytes().unwrap() + ); + assert_eq!( + partitions[1].clone().upper_bound.unwrap(), + Datum::float(15.5).to_bytes().unwrap() + ); + assert!(partitions[1].clone().contains_null); + assert_eq!(partitions[1].clone().contains_nan, Some(true)); + + assert_eq!( + partitions[2].clone().lower_bound.unwrap(), + Datum::double(1.0).to_bytes().unwrap() + ); + assert_eq!( + partitions[2].clone().upper_bound.unwrap(), + Datum::double(25.5).to_bytes().unwrap() + ); + assert!(!partitions[2].clone().contains_null); + assert_eq!(partitions[2].clone().contains_nan, Some(false)); } } diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index c89c147f9..dc60dc7a6 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -415,7 +415,7 @@ impl ManifestWriter { added_rows_count: Some(self.added_rows), existing_rows_count: Some(self.existing_rows), deleted_rows_count: Some(self.deleted_rows), - partitions: partition_summary, + partitions: Some(partition_summary), key_metadata: self.key_metadata, }) } @@ -423,59 +423,63 @@ impl ManifestWriter { struct PartitionFieldStats { partition_type: PrimitiveType, - summary: FieldSummary, + + contains_null: bool, + contains_nan: Option, + lower_bound: Option, + upper_bound: Option, } impl PartitionFieldStats { pub(crate) fn new(partition_type: PrimitiveType) -> Self { Self { partition_type, - summary: FieldSummary::default(), + contains_null: false, + contains_nan: Some(false), + upper_bound: None, + lower_bound: None, } } pub(crate) fn update(&mut self, value: Option) -> Result<()> { let Some(value) = value else { - self.summary.contains_null = true; + self.contains_null = true; return Ok(()); }; if !self.partition_type.compatible(&value) { return Err(Error::new( ErrorKind::DataInvalid, - "value is not compatitable with type", + "value is not compatible with type", )); } let value = Datum::new(self.partition_type.clone(), value); if value.is_nan() { - self.summary.contains_nan = Some(true); + self.contains_nan = Some(true); return Ok(()); } - self.summary.lower_bound = Some(self.summary.lower_bound.take().map_or( - value.clone(), - |original| { - if value < original { - value.clone() - } else { - original - } - }, - )); - self.summary.upper_bound = Some(self.summary.upper_bound.take().map_or( - value.clone(), - |original| { - if value > original { value } else { original } - }, - )); + self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| { + if value < original { + value.clone() + } else { + original + } + })); + self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| { + if value > original { value } else { original } + })); Ok(()) } - pub(crate) fn finish(mut self) -> FieldSummary { - // Always set contains_nan - self.summary.contains_nan = self.summary.contains_nan.or(Some(false)); - self.summary + pub(crate) fn finish(self) -> FieldSummary { + FieldSummary { + contains_null: self.contains_null, + contains_nan: self.contains_nan, + upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()), + lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()), + } } } diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index c29713cef..93bd739c0 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -23,10 +23,12 @@ use std::str::FromStr; use apache_avro::types::Value; use apache_avro::{Reader, Writer, from_value}; use bytes::Bytes; +pub use serde_bytes::ByteBuf; +use serde_derive::{Deserialize, Serialize}; use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2}; use self::_serde::{ManifestFileV1, ManifestFileV2}; -use super::{Datum, FormatVersion, Manifest, StructType}; +use super::{FormatVersion, Manifest}; use crate::error::Result; use crate::io::{FileIO, OutputFile}; use crate::{Error, ErrorKind}; @@ -55,21 +57,17 @@ pub struct ManifestList { impl ManifestList { /// Parse manifest list from bytes. - pub fn parse_with_version( - bs: &[u8], - version: FormatVersion, - partition_type_provider: impl Fn(i32) -> Result>, - ) -> Result { + pub fn parse_with_version(bs: &[u8], version: FormatVersion) -> Result { match version { FormatVersion::V1 => { let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?; let values = Value::Array(reader.collect::, _>>()?); - from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type_provider) + from_value::<_serde::ManifestListV1>(&values)?.try_into() } FormatVersion::V2 => { let reader = Reader::new(bs)?; let values = Value::Array(reader.collect::, _>>()?); - from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type_provider) + from_value::<_serde::ManifestListV2>(&values)?.try_into() } } } @@ -577,7 +575,7 @@ pub struct ManifestFile { /// A list of field summaries for each partition field in the spec. Each /// field in the list corresponds to a field in the manifest file’s /// partition spec. - pub partitions: Vec, + pub partitions: Option>, /// field: 519 /// /// Implementation-specific key metadata for encryption @@ -673,7 +671,7 @@ impl ManifestFile { /// Field summary for partition field in the spec. /// /// Each field in the list corresponds to a field in the manifest file’s partition spec. -#[derive(Debug, PartialEq, Eq, Clone, Default, Hash)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Hash)] pub struct FieldSummary { /// field: 509 /// @@ -687,11 +685,11 @@ pub struct FieldSummary { /// field: 510 /// The minimum value for the field in the manifests /// partitions. - pub lower_bound: Option, + pub lower_bound: Option, /// field: 511 /// The maximum value for the field in the manifests /// partitions. - pub upper_bound: Option, + pub upper_bound: Option, } /// This is a helper module that defines types to help with serialization/deserialization. @@ -705,7 +703,7 @@ pub(super) mod _serde { use super::ManifestFile; use crate::Error; use crate::error::Result; - use crate::spec::{Datum, PrimitiveType, StructType}; + use crate::spec::FieldSummary; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(transparent)] @@ -721,27 +719,12 @@ pub(super) mod _serde { impl ManifestListV2 { /// Converts the [ManifestListV2] into a [ManifestList]. - /// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. - pub fn try_into( - self, - partition_type_provider: impl Fn(i32) -> Result>, - ) -> Result { + pub fn try_into(self) -> Result { Ok(super::ManifestList { entries: self .entries .into_iter() - .map(|v| { - let partition_spec_id = v.partition_spec_id; - let manifest_path = v.manifest_path.clone(); - v.try_into(partition_type_provider(partition_spec_id)?.as_ref()) - .map_err(|err| { - err.with_context("manifest file path", manifest_path) - .with_context( - "partition spec id", - partition_spec_id.to_string(), - ) - }) - }) + .map(|v| v.try_into()) .collect::>>()?, }) } @@ -755,7 +738,7 @@ pub(super) mod _serde { entries: value .entries .into_iter() - .map(TryInto::try_into) + .map(|v| v.try_into()) .collect::, _>>()?, }) } @@ -763,27 +746,12 @@ pub(super) mod _serde { impl ManifestListV1 { /// Converts the [ManifestListV1] into a [ManifestList]. - /// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. - pub fn try_into( - self, - partition_type_provider: impl Fn(i32) -> Result>, - ) -> Result { + pub fn try_into(self) -> Result { Ok(super::ManifestList { entries: self .entries .into_iter() - .map(|v| { - let partition_spec_id = v.partition_spec_id; - let manifest_path = v.manifest_path.clone(); - v.try_into(partition_type_provider(partition_spec_id)?.as_ref()) - .map_err(|err| { - err.with_context("manifest file path", manifest_path) - .with_context( - "partition spec id", - partition_spec_id.to_string(), - ) - }) - }) + .map(|v| v.try_into()) .collect::>>()?, }) } @@ -797,7 +765,7 @@ pub(super) mod _serde { entries: value .entries .into_iter() - .map(TryInto::try_into) + .map(|v| v.try_into()) .collect::, _>>()?, }) } @@ -844,83 +812,9 @@ pub(super) mod _serde { pub key_metadata: Option, } - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] - pub(super) struct FieldSummary { - pub contains_null: bool, - pub contains_nan: Option, - pub lower_bound: Option, - pub upper_bound: Option, - } - - impl FieldSummary { - /// Converts the [FieldSummary] into a [super::FieldSummary]. - /// [lower_bound] and [upper_bound] are converted into [Literal]s need the type info so use - /// this function instead of [std::TryFrom] trait. - pub(crate) fn try_into(self, r#type: &PrimitiveType) -> Result { - Ok(super::FieldSummary { - contains_null: self.contains_null, - contains_nan: self.contains_nan, - lower_bound: self - .lower_bound - .as_ref() - .map(|v| Datum::try_from_bytes(v, r#type.clone())) - .transpose() - .map_err(|err| err.with_context("type", format!("{:?}", r#type)))?, - upper_bound: self - .upper_bound - .as_ref() - .map(|v| Datum::try_from_bytes(v, r#type.clone())) - .transpose() - .map_err(|err| err.with_context("type", format!("{:?}", r#type)))?, - }) - } - } - - fn try_convert_to_field_summary( - partitions: Option>, - partition_type: Option<&StructType>, - ) -> Result> { - if let Some(partitions) = partitions { - if let Some(partition_type) = partition_type { - let partition_types = partition_type.fields(); - if partitions.len() != partition_types.len() { - return Err(Error::new( - crate::ErrorKind::DataInvalid, - format!( - "Invalid partition spec. Expected {} fields, got {}", - partition_types.len(), - partitions.len() - ), - )); - } - partitions - .into_iter() - .zip(partition_types) - .map(|(v, field)| { - v.try_into(field.field_type.as_primitive_type().ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "Invalid partition spec. Field type is not primitive", - ) - })?) - }) - .collect::>>() - } else { - Err(Error::new( - crate::ErrorKind::DataInvalid, - "Invalid partition spec. Partition type is required", - )) - } - } else { - Ok(Vec::new()) - } - } - impl ManifestFileV2 { /// Converts the [ManifestFileV2] into a [ManifestFile]. - /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. - pub fn try_into(self, partition_type: Option<&StructType>) -> Result { - let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; + pub fn try_into(self) -> Result { Ok(ManifestFile { manifest_path: self.manifest_path, manifest_length: self.manifest_length, @@ -935,17 +829,15 @@ pub(super) mod _serde { added_rows_count: Some(self.added_rows_count.try_into()?), existing_rows_count: Some(self.existing_rows_count.try_into()?), deleted_rows_count: Some(self.deleted_rows_count.try_into()?), - partitions, + partitions: self.partitions, key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), }) } } impl ManifestFileV1 { - /// Converts the [MManifestFileV1] into a [ManifestFile]. - /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. - pub fn try_into(self, partition_type: Option<&StructType>) -> Result { - let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; + /// Converts the [ManifestFileV1] into a [ManifestFile]. + pub fn try_into(self) -> Result { Ok(ManifestFile { manifest_path: self.manifest_path, manifest_length: self.manifest_length, @@ -969,7 +861,7 @@ pub(super) mod _serde { .map(TryInto::try_into) .transpose()?, deleted_rows_count: self.deleted_rows_count.map(TryInto::try_into).transpose()?, - partitions, + partitions: self.partitions, key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), // as ref: https://iceberg.apache.org/spec/#partitioning // use 0 when reading v1 manifest lists @@ -980,27 +872,6 @@ pub(super) mod _serde { } } - fn convert_to_serde_field_summary( - partitions: Vec, - ) -> Result>> { - if partitions.is_empty() { - Ok(None) - } else { - let mut vs = Vec::with_capacity(partitions.len()); - - for v in partitions { - let fs = FieldSummary { - contains_null: v.contains_null, - contains_nan: v.contains_nan, - lower_bound: v.lower_bound.map(|v| v.to_bytes()).transpose()?, - upper_bound: v.upper_bound.map(|v| v.to_bytes()).transpose()?, - }; - vs.push(fs); - } - Ok(Some(vs)) - } - } - fn convert_to_serde_key_metadata(key_metadata: Vec) -> Option { if key_metadata.is_empty() { None @@ -1013,7 +884,6 @@ pub(super) mod _serde { type Error = Error; fn try_from(value: ManifestFile) -> std::result::Result { - let partitions = convert_to_serde_field_summary(value.partitions)?; let key_metadata = convert_to_serde_key_metadata(value.key_metadata); Ok(Self { manifest_path: value.manifest_path, @@ -1077,7 +947,7 @@ pub(super) mod _serde { ) })? .try_into()?, - partitions, + partitions: value.partitions, key_metadata, }) } @@ -1087,7 +957,6 @@ pub(super) mod _serde { type Error = Error; fn try_from(value: ManifestFile) -> std::result::Result { - let partitions = convert_to_serde_field_summary(value.partitions)?; let key_metadata = convert_to_serde_key_metadata(value.key_metadata); Ok(Self { manifest_path: value.manifest_path, @@ -1115,7 +984,7 @@ pub(super) mod _serde { .deleted_rows_count .map(TryInto::try_into) .transpose()?, - partitions, + partitions: value.partitions, key_metadata, }) } @@ -1124,9 +993,7 @@ pub(super) mod _serde { #[cfg(test)] mod test { - use std::collections::HashMap; use std::fs; - use std::sync::Arc; use apache_avro::{Reader, Schema}; use tempfile::TempDir; @@ -1136,7 +1003,7 @@ mod test { use crate::spec::manifest_list::_serde::ManifestListV1; use crate::spec::{ Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter, - NestedField, PrimitiveType, StructType, Type, UNASSIGNED_SEQUENCE_NUMBER, + UNASSIGNED_SEQUENCE_NUMBER, }; #[tokio::test] @@ -1157,7 +1024,7 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: vec![], + partitions: Some(vec![]), key_metadata: vec![], } ] @@ -1183,8 +1050,7 @@ mod test { let bs = fs::read(full_path).expect("read_file must succeed"); let parsed_manifest_list = - ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, |_id| Ok(None)) - .unwrap(); + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap(); assert_eq!(manifest_list, parsed_manifest_list); } @@ -1207,7 +1073,9 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1)), upper_bound: Some(Datum::long(1))}], + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] + ), key_metadata: vec![], }, ManifestFile { @@ -1224,7 +1092,9 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1)), upper_bound: Some(Datum::float(2.1))}], + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}] + ), key_metadata: vec![], } ] @@ -1251,29 +1121,7 @@ mod test { let bs = fs::read(full_path).expect("read_file must succeed"); let parsed_manifest_list = - ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2, |id| { - Ok(HashMap::from([ - ( - 1, - StructType::new(vec![Arc::new(NestedField::required( - 1, - "test", - Type::Primitive(PrimitiveType::Long), - ))]), - ), - ( - 2, - StructType::new(vec![Arc::new(NestedField::required( - 1, - "test", - Type::Primitive(PrimitiveType::Float), - ))]), - ), - ]) - .get(&id) - .cloned()) - }) - .unwrap(); + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap(); assert_eq!(manifest_list, parsed_manifest_list); } @@ -1295,7 +1143,7 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: vec![], + partitions: None, key_metadata: vec![], }] }.try_into().unwrap(); @@ -1323,7 +1171,9 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1)), upper_bound: Some(Datum::long(1))}], + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] + ), key_metadata: vec![], }] }.try_into().unwrap(); @@ -1351,7 +1201,9 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1)), upper_bound: Some(Datum::long(1))}], + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}], + ), key_metadata: vec![], }] }; @@ -1369,20 +1221,8 @@ mod test { let bs = fs::read(path).unwrap(); - let partition_types = HashMap::from([( - 1, - StructType::new(vec![Arc::new(NestedField::required( - 1, - "test", - Type::Primitive(PrimitiveType::Long), - ))]), - )]); - let manifest_list = - ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, move |id| { - Ok(partition_types.get(&id).cloned()) - }) - .unwrap(); + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap(); assert_eq!(manifest_list, expected_manifest_list); temp_dir.close().unwrap(); @@ -1407,7 +1247,9 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1)), upper_bound: Some(Datum::long(1))}], + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] + ), key_metadata: vec![], }] }; @@ -1424,19 +1266,8 @@ mod test { writer.close().await.unwrap(); let bs = fs::read(path).unwrap(); - let partition_types = HashMap::from([( - 1, - StructType::new(vec![Arc::new(NestedField::required( - 1, - "test", - Type::Primitive(PrimitiveType::Long), - ))]), - )]); let manifest_list = - ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2, move |id| { - Ok(partition_types.get(&id).cloned()) - }) - .unwrap(); + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap(); expected_manifest_list.entries[0].sequence_number = seq_num; expected_manifest_list.entries[0].min_sequence_number = seq_num; assert_eq!(manifest_list, expected_manifest_list); @@ -1461,7 +1292,9 @@ mod test { added_rows_count: Some(3), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1)), upper_bound: Some(Datum::long(1))}], + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] + ), key_metadata: vec![], }] }; @@ -1479,20 +1312,8 @@ mod test { let bs = fs::read(path).unwrap(); - let partition_types = HashMap::from([( - 1, - StructType::new(vec![Arc::new(NestedField::required( - 1, - "test", - Type::Primitive(PrimitiveType::Long), - ))]), - )]); - let manifest_list = - ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2, move |id| { - Ok(partition_types.get(&id).cloned()) - }) - .unwrap(); + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap(); assert_eq!(manifest_list, expected_manifest_list); temp_dir.close().unwrap(); @@ -1518,15 +1339,9 @@ mod test { ); // deserializing both files to ManifestList struct let _manifest_list_1 = - ManifestList::parse_with_version(&bs_1, crate::spec::FormatVersion::V2, move |_id| { - Ok(Some(StructType::new(vec![]))) - }) - .unwrap(); + ManifestList::parse_with_version(&bs_1, crate::spec::FormatVersion::V2).unwrap(); let _manifest_list_2 = - ManifestList::parse_with_version(&bs_2, crate::spec::FormatVersion::V2, move |_id| { - Ok(Some(StructType::new(vec![]))) - }) - .unwrap(); + ManifestList::parse_with_version(&bs_2, crate::spec::FormatVersion::V2).unwrap(); } async fn read_avro_schema_fields_as_str(bs: Vec) -> String { diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index a2716ad97..6b8d3382a 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -29,7 +29,7 @@ use typed_builder::TypedBuilder; use super::table_metadata::SnapshotLog; use crate::error::{Result, timestamp_ms_to_utc}; use crate::io::FileIO; -use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata}; +use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata}; use crate::{Error, ErrorKind}; /// The ref name of the main branch of the table. @@ -190,20 +190,11 @@ impl Snapshot { table_metadata: &TableMetadata, ) -> Result { let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?; - - let schema = self.schema(table_metadata)?; - - let partition_type_provider = |partition_spec_id: i32| -> Result> { - table_metadata - .partition_spec_by_id(partition_spec_id) - .map(|partition_spec| partition_spec.partition_type(&schema)) - .transpose() - }; - ManifestList::parse_with_version( &manifest_list_content, + // TODO: You don't really need the version since you could just project any Avro in + // the version that you'd like to get (probably always the latest) table_metadata.format_version(), - partition_type_provider, ) } diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index 556c5435e..25471ebcb 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -848,7 +848,7 @@ mod tests { added_rows_count: Some(100), existing_rows_count: Some(0), deleted_rows_count: Some(50), - partitions: Vec::new(), + partitions: Some(Vec::new()), key_metadata: Vec::new(), }; @@ -972,7 +972,7 @@ mod tests { added_rows_count: Some(5), existing_rows_count: Some(0), deleted_rows_count: Some(0), - partitions: Vec::new(), + partitions: Some(Vec::new()), key_metadata: Vec::new(), });