Skip to content

change identifier_field_ids to option type #1488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions crates/iceberg/src/spec/schema/_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,15 @@ impl From<Schema> for SchemaV2 {
fn from(value: Schema) -> Self {
SchemaV2 {
schema_id: value.schema_id,
identifier_field_ids: if value.identifier_field_ids.is_empty() {
None
} else {
Some(value.identifier_field_ids.into_iter().collect())
identifier_field_ids: match value.identifier_field_ids {
None => None,
Some(identifier_field_ids) => {
if identifier_field_ids.is_empty() {
None
} else {
Some(identifier_field_ids.into_iter().collect())
}
}
},
fields: value.r#struct,
}
Expand All @@ -118,10 +123,15 @@ impl From<Schema> for SchemaV1 {
fn from(value: Schema) -> Self {
SchemaV1 {
schema_id: Some(value.schema_id),
identifier_field_ids: if value.identifier_field_ids.is_empty() {
None
} else {
Some(value.identifier_field_ids.into_iter().collect())
identifier_field_ids: match value.identifier_field_ids {
None => None,
Some(identifier_field_ids) => {
if identifier_field_ids.is_empty() {
None
} else {
Some(identifier_field_ids.into_iter().collect())
}
}
},
fields: value.r#struct,
}
Expand Down
66 changes: 45 additions & 21 deletions crates/iceberg/src/spec/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct Schema {
r#struct: StructType,
schema_id: SchemaId,
highest_field_id: i32,
identifier_field_ids: HashSet<i32>,
identifier_field_ids: Option<HashSet<i32>>,

alias_to_id: BiHashMap<String, i32>,
id_to_field: HashMap<i32, NestedFieldRef>,
Expand Down Expand Up @@ -86,7 +86,7 @@ pub struct SchemaBuilder {
schema_id: i32,
fields: Vec<NestedFieldRef>,
alias_to_id: BiHashMap<String, i32>,
identifier_field_ids: HashSet<i32>,
identifier_field_ids: Option<HashSet<i32>>,
reassign_field_ids_from: Option<i32>,
}

Expand Down Expand Up @@ -114,8 +114,14 @@ impl SchemaBuilder {

/// Set identifier field ids.
pub fn with_identifier_field_ids(mut self, ids: impl IntoIterator<Item = i32>) -> Self {
self.identifier_field_ids.extend(ids);
self
match self.identifier_field_ids {
Some(mut identifier_field_ids) => {
identifier_field_ids.extend(ids);
self.identifier_field_ids = Some(identifier_field_ids);
self
}
None => self,
}
}

/// Set alias to filed id mapping.
Expand All @@ -131,11 +137,14 @@ impl SchemaBuilder {
let r#struct = StructType::new(self.fields);
let id_to_field = index_by_id(&r#struct)?;

Self::validate_identifier_ids(
&r#struct,
&id_to_field,
self.identifier_field_ids.iter().copied(),
)?;
// Only validate identifier fields if they are set.
if let Some(identifier_field_ids) = &self.identifier_field_ids {
Self::validate_identifier_ids(
&r#struct,
&id_to_field,
identifier_field_ids.iter().copied(),
)?;
}

let (name_to_id, id_to_name) = {
let mut index = IndexByName::default();
Expand Down Expand Up @@ -168,16 +177,18 @@ impl SchemaBuilder {
if let Some(start_from) = self.reassign_field_ids_from {
let mut id_reassigner = ReassignFieldIds::new(start_from);
let new_fields = id_reassigner.reassign_field_ids(schema.r#struct.fields().to_vec())?;
let new_identifier_field_ids =
id_reassigner.apply_to_identifier_fields(schema.identifier_field_ids)?;
let new_alias_to_id = id_reassigner.apply_to_aliases(schema.alias_to_id.clone())?;

schema = Schema::builder()
let mut schema_builder = Schema::builder()
.with_schema_id(schema.schema_id)
.with_fields(new_fields)
.with_identifier_field_ids(new_identifier_field_ids)
.with_alias(new_alias_to_id)
.build()?;
.with_fields(new_fields);

if let Some(schema_identifier_field_ids) = schema.identifier_field_ids {
let new_identifier_field_ids =
id_reassigner.apply_to_identifier_fields(schema_identifier_field_ids)?;
schema_builder = schema_builder.with_identifier_field_ids(new_identifier_field_ids);
}
schema = schema_builder.with_alias(new_alias_to_id).build()?;
}

Ok(schema)
Expand Down Expand Up @@ -310,7 +321,7 @@ impl Schema {
SchemaBuilder {
schema_id: DEFAULT_SCHEMA_ID,
fields: vec![],
identifier_field_ids: HashSet::default(),
identifier_field_ids: None,
alias_to_id: BiHashMap::default(),
reassign_field_ids_from: None,
}
Expand Down Expand Up @@ -377,8 +388,10 @@ impl Schema {

/// Returns [`identifier_field_ids`].
#[inline]
pub fn identifier_field_ids(&self) -> impl ExactSizeIterator<Item = i32> + '_ {
self.identifier_field_ids.iter().copied()
pub fn identifier_field_ids(&self) -> Option<impl ExactSizeIterator<Item = i32> + '_> {
self.identifier_field_ids
.as_ref()
.map(|identifier_field_ids| identifier_field_ids.iter().copied())
}

/// Get field id by full name.
Expand All @@ -398,8 +411,19 @@ impl Schema {

/// Check if this schema is identical to another schema semantically - excluding schema id.
pub(crate) fn is_same_schema(&self, other: &SchemaRef) -> bool {
self.as_struct().eq(other.as_struct())
&& self.identifier_field_ids().eq(other.identifier_field_ids())
let same_struct = self.as_struct().eq(other.as_struct());

let same_identifier_field_ids =
match (self.identifier_field_ids(), other.identifier_field_ids()) {
(None, None) => true,
(None, Some(_)) => false,
(Some(_), None) => false,
(Some(identifier_field_ids), Some(other_identifier_field_ids)) => {
identifier_field_ids.eq(other_identifier_field_ids)
}
};

same_struct && same_identifier_field_ids
}

/// Change the schema id of this schema.
Expand Down
Loading