Skip to content

Commit 2bc5dca

Browse files
authored
feat(iceberg): introduce remove schemas (#1115)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> ## What changes are included in this PR? This PR introduces the capability to remove schemas to enable expire snapshot action operations. https://github.com/apache/iceberg/blob/456bbe98b0b0982278a61af4c44d32e1c27417e2/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1177 https://github.com/apache/iceberg/blob/456bbe98b0b0982278a61af4c44d32e1c27417e2/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java#L249 refer to risingwavelabs#20 <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? - UT <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? -->
1 parent a341122 commit 2bc5dca

File tree

2 files changed

+103
-0
lines changed

2 files changed

+103
-0
lines changed

crates/iceberg/src/catalog/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,12 @@ pub enum TableUpdate {
483483
/// Snapshot id to remove partition statistics for.
484484
snapshot_id: i64,
485485
},
486+
/// Remove schemas
487+
#[serde(rename_all = "kebab-case")]
488+
RemoveSchemas {
489+
/// Schema IDs to remove.
490+
schema_ids: Vec<i32>,
491+
},
486492
}
487493

488494
impl TableUpdate {
@@ -526,6 +532,7 @@ impl TableUpdate {
526532
TableUpdate::RemovePartitionStatistics { snapshot_id } => {
527533
Ok(builder.remove_partition_statistics(snapshot_id))
528534
}
535+
TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
529536
}
530537
}
531538
}
@@ -2049,4 +2056,19 @@ mod tests {
20492056
},
20502057
)
20512058
}
2059+
2060+
#[test]
2061+
fn test_remove_schema_update() {
2062+
test_serde_json(
2063+
r#"
2064+
{
2065+
"action": "remove-schemas",
2066+
"schema-ids": [1, 2]
2067+
}
2068+
"#,
2069+
TableUpdate::RemoveSchemas {
2070+
schema_ids: vec![1, 2],
2071+
},
2072+
);
2073+
}
20522074
}

crates/iceberg/src/spec/table_metadata_builder.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,37 @@ impl TableMetadataBuilder {
12101210
fn highest_sort_order_id(&self) -> Option<i64> {
12111211
self.metadata.sort_orders.keys().max().copied()
12121212
}
1213+
1214+
/// Remove schemas by their ids from the table metadata.
1215+
/// Does nothing if a schema id is not present. Active schemas should not be removed.
1216+
pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1217+
if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1218+
return Err(Error::new(
1219+
ErrorKind::DataInvalid,
1220+
"Cannot remove current schema",
1221+
));
1222+
}
1223+
1224+
if schema_id_to_remove.is_empty() {
1225+
return Ok(self);
1226+
}
1227+
1228+
let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1229+
self.metadata.schemas.retain(|id, _schema| {
1230+
if schema_id_to_remove.contains(id) {
1231+
removed_schemas.push(*id);
1232+
false
1233+
} else {
1234+
true
1235+
}
1236+
});
1237+
1238+
self.changes.push(TableUpdate::RemoveSchemas {
1239+
schema_ids: removed_schemas,
1240+
});
1241+
1242+
Ok(self)
1243+
}
12131244
}
12141245

12151246
impl From<TableMetadataBuildResult> for TableMetadata {
@@ -2412,4 +2443,54 @@ mod tests {
24122443
table.metadata().current_snapshot_id().unwrap()
24132444
);
24142445
}
2446+
2447+
#[test]
2448+
fn test_active_schema_cannot_be_removed() {
2449+
let builder = builder_without_changes(FormatVersion::V2);
2450+
builder.remove_schemas(&[0]).unwrap_err();
2451+
}
2452+
2453+
#[test]
2454+
fn test_remove_schemas() {
2455+
let file = File::open(format!(
2456+
"{}/testdata/table_metadata/{}",
2457+
env!("CARGO_MANIFEST_DIR"),
2458+
"TableMetadataV2Valid.json"
2459+
))
2460+
.unwrap();
2461+
let reader = BufReader::new(file);
2462+
let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2463+
2464+
let table = Table::builder()
2465+
.metadata(resp)
2466+
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2467+
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2468+
.file_io(FileIOBuilder::new("memory").build().unwrap())
2469+
.build()
2470+
.unwrap();
2471+
2472+
assert_eq!(2, table.metadata().schemas.len());
2473+
2474+
{
2475+
// can not remove active schema
2476+
let meta_data_builder = table.metadata().clone().into_builder(None);
2477+
meta_data_builder.remove_schemas(&[1]).unwrap_err();
2478+
}
2479+
2480+
let mut meta_data_builder = table.metadata().clone().into_builder(None);
2481+
meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2482+
let build_result = meta_data_builder.build().unwrap();
2483+
assert_eq!(1, build_result.metadata.schemas.len());
2484+
assert_eq!(1, build_result.metadata.current_schema_id);
2485+
assert_eq!(1, build_result.metadata.current_schema().schema_id());
2486+
assert_eq!(1, build_result.changes.len());
2487+
2488+
let remove_schema_ids =
2489+
if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2490+
schema_ids
2491+
} else {
2492+
unreachable!("Expected RemoveSchema change")
2493+
};
2494+
assert_eq!(remove_schema_ids, &[0]);
2495+
}
24152496
}

0 commit comments

Comments
 (0)