diff --git a/crates/iceberg/src/spec/schema/mod.rs b/crates/iceberg/src/spec/schema/mod.rs index 23b0a4349b..0a15a0a082 100644 --- a/crates/iceberg/src/spec/schema/mod.rs +++ b/crates/iceberg/src/spec/schema/mod.rs @@ -34,7 +34,8 @@ use serde::{Deserialize, Serialize}; use self::_serde::SchemaEnum; use self::id_reassigner::ReassignFieldIds; -use self::index::{index_by_id, index_parents, IndexByName}; +pub use self::index::index_parents; +use self::index::{index_by_id, IndexByName}; pub use self::prune_columns::prune_columns; use super::NestedField; use crate::error::Result; @@ -376,6 +377,14 @@ impl Schema { self.identifier_field_ids.iter().copied() } + /// Returns the set of identifier field names + pub fn identifier_field_names(&self) -> HashSet { + self.identifier_field_ids + .iter() + .filter_map(|id| self.id_to_name.get(id).cloned()) + .collect() + } + /// Get field id by full name. pub fn field_id_by_name(&self, name: &str) -> Option { self.name_to_id.get(name).copied() diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 108ad10595..aba29d4dd1 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -18,6 +18,7 @@ //! This module contains transaction api. mod append; +mod schema_update; mod snapshot; mod sort_order; @@ -32,6 +33,7 @@ use crate::error::Result; use crate::spec::FormatVersion; use crate::table::Table; use crate::transaction::append::FastAppendAction; +use crate::transaction::schema_update::SchemaUpdateAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; @@ -150,6 +152,16 @@ impl<'a> Transaction<'a> { snapshot_id } + /// Get base table + pub fn current(&self) -> &Table { + &self.current_table + } + + /// Update table's schema + pub fn update_schema(self) -> SchemaUpdateAction<'a> { + SchemaUpdateAction::new(self) + } + /// Creates a fast append action. pub fn fast_append( self, diff --git a/crates/iceberg/src/transaction/schema_update.rs b/crates/iceberg/src/transaction/schema_update.rs new file mode 100644 index 0000000000..b212f531dc --- /dev/null +++ b/crates/iceberg/src/transaction/schema_update.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use crate::spec::{index_parents, PrimitiveType, Schema, TableMetadataRef, Type}; +use crate::transaction::{Arc, Transaction}; +use crate::TableCommit; + +pub struct SchemaUpdateAction<'a> { + tx: Transaction<'a>, + base: TableMetadataRef, + schema: Arc, + last_column_id: i32, + id_to_parent: HashMap, + identifier_field_names: HashSet, +} + +impl<'a> SchemaUpdateAction<'a> { + pub fn new(tx: Transaction<'a>) -> Self { + let base = tx.current().metadata_ref(); + let schema = base.schemas.get(&base.current_schema_id).unwrap().clone(); + let last_column_id = base.last_column_id; + let id_to_parent = index_parents(schema.as_struct()).unwrap(); + let identifier_field_names = schema.identifier_field_names(); + Self { + tx, + base, + schema, + last_column_id, + id_to_parent, + identifier_field_names, + } + } + + pub fn add_column(self, parent: String, name: String, r#type: Type, doc: String) -> Self { + self + } + + pub fn add_required_column( + self, + parent: String, + name: String, + r#type: Type, + doc: String, + ) -> Self { + self + } + + pub fn delete_column(self, name: String) -> Self { + self + } + + pub fn rename_column(self, name: String, new_name: String) -> Self { + self + } + + pub fn require_column(self, name: String) -> Self { + self + } + + pub fn make_column_optional(self, name: String) -> Self { + self + } + + pub fn update_column(self, name: String, new_type: PrimitiveType) -> Self { + self + } + + pub fn update_column_doc(self, name: String, doc: String) -> Self { + self + } + + pub fn move_first(self, name: String) -> Self { + self + } + + pub fn move_before(self, name: String, before_name: String) -> Self { + self + } + + pub fn move_after(self, name: String, after_name: String) -> Self { + self + } + + pub fn union_by_name_with(self, new_schema: Schema) -> Self { + self + } + + pub fn set_identifier_fields(self, identifier_field_names: Vec) -> Self { + self + } + + pub fn case_sensitive(self, case_sensitivity: bool) -> Self { + self + } + + pub fn apply(self) -> Schema { + Schema::builder().build().unwrap() + } + + pub fn commit(self) -> () { + // TODO apply changes to metadata first + + // TODO fix this + // let table_commit = TableCommit::builder() + // .ident(self.tx.current().identifier().clone()) + // .updates(self.updates) + // .requirements(self.requirements) + } +}