Skip to content

Commit 198e5c1

Browse files
Fokkokevinjqliu
andauthored
Add support for evolving a partition column (#1334)
## Which issue does this PR close? To check what happens if a partition column is evolved. ## What changes are included in this PR? <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> --------- Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com>
1 parent 6797654 commit 198e5c1

File tree

5 files changed

+118
-3
lines changed

5 files changed

+118
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/src/spec/values.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,12 +403,26 @@ impl Datum {
403403
}
404404
}
405405
PrimitiveType::Int => PrimitiveLiteral::Int(i32::from_le_bytes(bytes.try_into()?)),
406-
PrimitiveType::Long => PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?)),
406+
PrimitiveType::Long => {
407+
if bytes.len() == 4 {
408+
// In the case of an evolved field
409+
PrimitiveLiteral::Long(i32::from_le_bytes(bytes.try_into()?) as i64)
410+
} else {
411+
PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
412+
}
413+
}
407414
PrimitiveType::Float => {
408415
PrimitiveLiteral::Float(OrderedFloat(f32::from_le_bytes(bytes.try_into()?)))
409416
}
410417
PrimitiveType::Double => {
411-
PrimitiveLiteral::Double(OrderedFloat(f64::from_le_bytes(bytes.try_into()?)))
418+
if bytes.len() == 4 {
419+
// In the case of an evolved field
420+
PrimitiveLiteral::Double(OrderedFloat(
421+
f32::from_le_bytes(bytes.try_into()?) as f64
422+
))
423+
} else {
424+
PrimitiveLiteral::Double(OrderedFloat(f64::from_le_bytes(bytes.try_into()?)))
425+
}
412426
}
413427
PrimitiveType::Date => PrimitiveLiteral::Int(i32::from_le_bytes(bytes.try_into()?)),
414428
PrimitiveType::Time => PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?)),
@@ -3172,6 +3186,13 @@ mod tests {
31723186
check_avro_bytes_serde(bytes, Datum::long(32), &PrimitiveType::Long);
31733187
}
31743188

3189+
#[test]
3190+
fn avro_bytes_long_from_int() {
3191+
let bytes = vec![32u8, 0u8, 0u8, 0u8];
3192+
3193+
check_avro_bytes_serde(bytes, Datum::long(32), &PrimitiveType::Long);
3194+
}
3195+
31753196
#[test]
31763197
fn avro_bytes_float() {
31773198
let bytes = vec![0u8, 0u8, 128u8, 63u8];
@@ -3186,6 +3207,13 @@ mod tests {
31863207
check_avro_bytes_serde(bytes, Datum::double(1.0), &PrimitiveType::Double);
31873208
}
31883209

3210+
#[test]
3211+
fn avro_bytes_double_from_float() {
3212+
let bytes = vec![0u8, 0u8, 128u8, 63u8];
3213+
3214+
check_avro_bytes_serde(bytes, Datum::double(1.0), &PrimitiveType::Double);
3215+
}
3216+
31893217
#[test]
31903218
fn avro_bytes_string() {
31913219
let bytes = vec![105u8, 99u8, 101u8, 98u8, 101u8, 114u8, 103u8];

crates/integration_tests/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,4 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
3737
parquet = { workspace = true }
3838
tokio = { workspace = true }
3939
uuid = { workspace = true }
40+
ordered-float = "2.10.1"

crates/integration_tests/testdata/spark/provision.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@
120120
spark.sql("ALTER TABLE rest.default.test_promote_column ALTER COLUMN foo TYPE bigint")
121121
spark.sql("INSERT INTO rest.default.test_promote_column VALUES (25)")
122122

123+
# Create a table, and do some evolution on a partition column
124+
spark.sql("CREATE OR REPLACE TABLE rest.default.test_promote_partition_column (foo int, bar float, baz decimal(4, 2)) USING iceberg PARTITIONED BY (foo)")
125+
spark.sql("INSERT INTO rest.default.test_promote_partition_column VALUES (19, 19.25, 19.25)")
126+
spark.sql("ALTER TABLE rest.default.test_promote_partition_column ALTER COLUMN foo TYPE bigint")
127+
spark.sql("ALTER TABLE rest.default.test_promote_partition_column ALTER COLUMN bar TYPE double")
128+
spark.sql("ALTER TABLE rest.default.test_promote_partition_column ALTER COLUMN baz TYPE decimal(6, 2)")
129+
spark.sql("INSERT INTO rest.default.test_promote_partition_column VALUES (25, 22.25, 22.25)")
130+
123131
# Create a table with various types
124132
spark.sql("""
125133
CREATE OR REPLACE TABLE rest.default.types_test USING ICEBERG AS

crates/integration_tests/tests/shared_tests/read_evolved_schema.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
//! Integration tests for rest catalog.
1919
20-
use arrow_array::{Int64Array, StringArray};
20+
use arrow_array::{Decimal128Array, Float64Array, Int64Array, StringArray};
2121
use futures::TryStreamExt;
2222
use iceberg::expr::Reference;
2323
use iceberg::spec::Datum;
2424
use iceberg::{Catalog, TableIdent};
2525
use iceberg_catalog_rest::RestCatalog;
26+
use ordered_float::OrderedFloat;
2627

2728
use crate::get_shared_containers;
2829

@@ -98,4 +99,80 @@ async fn test_evolved_schema() {
9899
actual.sort();
99100

100101
assert_eq!(actual, vec![19, 25]);
102+
103+
// Evolve a partitioned column
104+
let table = rest_catalog
105+
.load_table(&TableIdent::from_strs(["default", "test_promote_partition_column"]).unwrap())
106+
.await
107+
.unwrap();
108+
let scan = table.scan().build();
109+
let batch_stream = scan.unwrap().to_arrow().await.unwrap();
110+
111+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
112+
let mut actual_foo = vec![
113+
batches[0]
114+
.column_by_name("foo")
115+
.unwrap()
116+
.as_any()
117+
.downcast_ref::<Int64Array>()
118+
.unwrap()
119+
.value(0),
120+
batches[1]
121+
.column_by_name("foo")
122+
.unwrap()
123+
.as_any()
124+
.downcast_ref::<Int64Array>()
125+
.unwrap()
126+
.value(0),
127+
];
128+
129+
actual_foo.sort();
130+
131+
assert_eq!(actual_foo, vec![19, 25]);
132+
133+
let mut actual_bar = vec![
134+
OrderedFloat(
135+
batches[0]
136+
.column_by_name("bar")
137+
.unwrap()
138+
.as_any()
139+
.downcast_ref::<Float64Array>()
140+
.unwrap()
141+
.value(0),
142+
),
143+
OrderedFloat(
144+
batches[1]
145+
.column_by_name("bar")
146+
.unwrap()
147+
.as_any()
148+
.downcast_ref::<Float64Array>()
149+
.unwrap()
150+
.value(0),
151+
),
152+
];
153+
154+
actual_bar.sort();
155+
156+
assert_eq!(actual_bar, vec![19.25, 22.25]);
157+
158+
let mut actual_baz = vec![
159+
batches[0]
160+
.column_by_name("baz")
161+
.unwrap()
162+
.as_any()
163+
.downcast_ref::<Decimal128Array>()
164+
.unwrap()
165+
.value(0),
166+
batches[1]
167+
.column_by_name("baz")
168+
.unwrap()
169+
.as_any()
170+
.downcast_ref::<Decimal128Array>()
171+
.unwrap()
172+
.value(0),
173+
];
174+
175+
actual_baz.sort();
176+
177+
assert_eq!(actual_baz, vec![1925, 2225]);
101178
}

0 commit comments

Comments
 (0)