Skip to content

Commit 36cc120

Browse files
authored
feat(datafusion): support metadata tables for Datafusion (#879)
Use the newly added metadata tables #823 to support this feature in Datafusion engine to demonstrate its usage. This may also potentially help design and stablize the interface of metadata tables in the `iceberg` crate. --------- Signed-off-by: xxchan <xxchan22f@gmail.com>
1 parent 8edd708 commit 36cc120

File tree

15 files changed

+517
-86
lines changed

15 files changed

+517
-86
lines changed

Cargo.lock

Lines changed: 26 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ serde_repr = "0.1.16"
107107
serde_with = "3.4"
108108
sqllogictest = "0.28"
109109
stacker = "0.1.20"
110+
strum = "0.27"
110111
tempfile = "3.18"
111112
tera = "1"
112113
thrift = "0.17.0"

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ bimap = { workspace = true }
6262
bytes = { workspace = true }
6363
chrono = { workspace = true }
6464
derive_builder = { workspace = true }
65+
expect-test = { workspace = true }
6566
fnv = { workspace = true }
6667
futures = { workspace = true }
6768
itertools = { workspace = true }
@@ -82,6 +83,7 @@ serde_derive = { workspace = true }
8283
serde_json = { workspace = true }
8384
serde_repr = { workspace = true }
8485
serde_with = { workspace = true }
86+
strum = { workspace = true, features = ["derive"] }
8587
thrift = { workspace = true }
8688
tokio = { workspace = true, optional = false, features = ["sync"] }
8789
typed-builder = { workspace = true }

crates/iceberg/src/inspect/manifests.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,10 @@ impl<'a> ManifestsTable<'a> {
281281
#[cfg(test)]
282282
mod tests {
283283
use expect_test::expect;
284+
use futures::TryStreamExt;
284285

285-
use crate::inspect::metadata_table::tests::check_record_batches;
286286
use crate::scan::tests::TableTestFixture;
287+
use crate::test_utils::check_record_batches;
287288

288289
#[tokio::test]
289290
async fn test_manifests_table() {
@@ -293,7 +294,7 @@ mod tests {
293294
let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();
294295

295296
check_record_batches(
296-
record_batch,
297+
record_batch.try_collect::<Vec<_>>().await.unwrap(),
297298
expect![[r#"
298299
Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
299300
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
@@ -378,6 +379,6 @@ mod tests {
378379
]"#]],
379380
&["path", "length"],
380381
Some("path"),
381-
).await;
382+
);
382383
}
383384
}

crates/iceberg/src/inspect/metadata_table.rs

Lines changed: 37 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,43 @@ use crate::table::Table;
2727
#[derive(Debug)]
2828
pub struct MetadataTable<'a>(&'a Table);
2929

30+
/// Metadata table type.
31+
#[derive(Debug, Clone, strum::EnumIter)]
32+
pub enum MetadataTableType {
33+
/// [`SnapshotsTable`]
34+
Snapshots,
35+
/// [`ManifestsTable`]
36+
Manifests,
37+
}
38+
39+
impl MetadataTableType {
40+
/// Returns the string representation of the metadata table type.
41+
pub fn as_str(&self) -> &str {
42+
match self {
43+
MetadataTableType::Snapshots => "snapshots",
44+
MetadataTableType::Manifests => "manifests",
45+
}
46+
}
47+
48+
/// Returns all the metadata table types.
49+
pub fn all_types() -> impl Iterator<Item = Self> {
50+
use strum::IntoEnumIterator;
51+
Self::iter()
52+
}
53+
}
54+
55+
impl TryFrom<&str> for MetadataTableType {
56+
type Error = String;
57+
58+
fn try_from(value: &str) -> std::result::Result<Self, String> {
59+
match value {
60+
"snapshots" => Ok(Self::Snapshots),
61+
"manifests" => Ok(Self::Manifests),
62+
_ => Err(format!("invalid metadata table type: {value}")),
63+
}
64+
}
65+
}
66+
3067
impl<'a> MetadataTable<'a> {
3168
/// Creates a new metadata scan.
3269
pub fn new(table: &'a Table) -> Self {
@@ -43,69 +80,3 @@ impl<'a> MetadataTable<'a> {
4380
ManifestsTable::new(self.0)
4481
}
4582
}
46-
47-
#[cfg(test)]
48-
pub mod tests {
49-
//! Sharable tests for the metadata table.
50-
51-
use expect_test::Expect;
52-
use futures::TryStreamExt;
53-
use itertools::Itertools;
54-
55-
use crate::scan::ArrowRecordBatchStream;
56-
57-
/// Snapshot testing to check the resulting record batch.
58-
///
59-
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
60-
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
61-
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
62-
/// Check the doc of [`expect_test`] for more details.
63-
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
64-
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
65-
pub async fn check_record_batches(
66-
batch_stream: ArrowRecordBatchStream,
67-
expected_schema: Expect,
68-
expected_data: Expect,
69-
ignore_check_columns: &[&str],
70-
sort_column: Option<&str>,
71-
) {
72-
let record_batches = batch_stream.try_collect::<Vec<_>>().await.unwrap();
73-
assert!(!record_batches.is_empty(), "Empty record batches");
74-
75-
// Combine record batches using the first batch's schema
76-
let first_batch = record_batches.first().unwrap();
77-
let record_batch =
78-
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();
79-
80-
let mut columns = record_batch.columns().to_vec();
81-
if let Some(sort_column) = sort_column {
82-
let column = record_batch.column_by_name(sort_column).unwrap();
83-
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
84-
columns = columns
85-
.iter()
86-
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
87-
.collect_vec();
88-
}
89-
90-
expected_schema.assert_eq(&format!(
91-
"{}",
92-
record_batch.schema().fields().iter().format(",\n")
93-
));
94-
expected_data.assert_eq(&format!(
95-
"{}",
96-
record_batch
97-
.schema()
98-
.fields()
99-
.iter()
100-
.zip_eq(columns)
101-
.map(|(field, column)| {
102-
if ignore_check_columns.contains(&field.name().as_str()) {
103-
format!("{}: (skipped)", field.name())
104-
} else {
105-
format!("{}: {:?}", field.name(), column)
106-
}
107-
})
108-
.format(",\n")
109-
));
110-
}
111-
}

crates/iceberg/src/inspect/snapshots.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,10 @@ impl<'a> SnapshotsTable<'a> {
137137
#[cfg(test)]
138138
mod tests {
139139
use expect_test::expect;
140+
use futures::TryStreamExt;
140141

141-
use crate::inspect::metadata_table::tests::check_record_batches;
142142
use crate::scan::tests::TableTestFixture;
143+
use crate::test_utils::check_record_batches;
143144

144145
#[tokio::test]
145146
async fn test_snapshots_table() {
@@ -148,7 +149,7 @@ mod tests {
148149
let batch_stream = table.inspect().snapshots().scan().await.unwrap();
149150

150151
check_record_batches(
151-
batch_stream,
152+
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
152153
expect![[r#"
153154
Field { name: "committed_at", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
154155
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
@@ -211,6 +212,6 @@ mod tests {
211212
]"#]],
212213
&["manifest_list"],
213214
Some("committed_at"),
214-
).await;
215+
);
215216
}
216217
}

crates/iceberg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ mod runtime;
8484

8585
pub mod arrow;
8686
pub(crate) mod delete_file_index;
87+
pub mod test_utils;
8788
mod utils;
8889
pub mod writer;
8990

crates/iceberg/src/test_utils.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Test utilities.
19+
//! This module is pub just for internal testing.
20+
//! It is subject to change and is not intended to be used by external users.
21+
22+
use arrow_array::RecordBatch;
23+
use expect_test::Expect;
24+
use itertools::Itertools;
25+
26+
/// Snapshot testing to check the resulting record batch.
27+
///
28+
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
29+
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
30+
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
31+
/// Check the doc of [`expect_test`] for more details.
32+
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
33+
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
34+
pub fn check_record_batches(
35+
record_batches: Vec<RecordBatch>,
36+
expected_schema: Expect,
37+
expected_data: Expect,
38+
ignore_check_columns: &[&str],
39+
sort_column: Option<&str>,
40+
) {
41+
assert!(!record_batches.is_empty(), "Empty record batches");
42+
43+
// Combine record batches using the first batch's schema
44+
let first_batch = record_batches.first().unwrap();
45+
let record_batch =
46+
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();
47+
48+
let mut columns = record_batch.columns().to_vec();
49+
if let Some(sort_column) = sort_column {
50+
let column = record_batch.column_by_name(sort_column).unwrap();
51+
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
52+
columns = columns
53+
.iter()
54+
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
55+
.collect_vec();
56+
}
57+
58+
expected_schema.assert_eq(&format!(
59+
"{}",
60+
record_batch.schema().fields().iter().format(",\n")
61+
));
62+
expected_data.assert_eq(&format!(
63+
"{}",
64+
record_batch
65+
.schema()
66+
.fields()
67+
.iter()
68+
.zip_eq(columns)
69+
.map(|(field, column)| {
70+
if ignore_check_columns.contains(&field.name().as_str()) {
71+
format!("{}: (skipped)", field.name())
72+
} else {
73+
format!("{}: {:?}", field.name(), column)
74+
}
75+
})
76+
.format(",\n")
77+
));
78+
}

crates/integrations/datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ iceberg = { workspace = true }
3737
tokio = { workspace = true }
3838

3939
[dev-dependencies]
40+
expect-test = { workspace = true }
4041
iceberg-catalog-memory = { workspace = true }
4142
parquet = { workspace = true }
4243
tempfile = { workspace = true }

0 commit comments

Comments
 (0)