Skip to content

Commit 24b02e3

Browse files
committed
Merge branch 'main' of github.com:apache/iceberg-rust into fd-avro-pyiceberg
2 parents 9bc9baf + c79eb1b commit 24b02e3

File tree

13 files changed

+2113
-496
lines changed

13 files changed

+2113
-496
lines changed

bindings/python/Cargo.lock

Lines changed: 1732 additions & 453 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bindings/python/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,6 @@ arrow = { version = "55", features = ["pyarrow", "chrono-tz"] }
3535
iceberg = { path = "../../crates/iceberg" }
3636
pyo3 = { version = "0.24", features = ["extension-module", "abi3-py39"] }
3737
serde_json = "1.0.138"
38+
iceberg-datafusion = { path = "../../crates/integrations/datafusion" }
39+
datafusion-ffi = { version = "47" }
40+
tokio = { version = "1.44", default-features = false }

bindings/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ python-source = "python"
4444
ignore = ["F403", "F405"]
4545

4646
[tool.hatch.envs.dev]
47-
dependencies = ["maturin>=1.0,<2.0", "pytest>=8.3.2", "pyarrow>=17.0.0"]
47+
dependencies = ["maturin>=1.0,<2.0", "pytest>=8.3.2", "pyarrow>=17.0.0", "datafusion>=45", "pyiceberg[sql-sqlite]>=0.9.1"]
4848

4949
[tool.hatch.envs.dev.scripts]
5050
build = "maturin build --out dist --sdist"
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::ffi::CString;
20+
use std::sync::Arc;
21+
22+
use datafusion_ffi::table_provider::FFI_TableProvider;
23+
use iceberg::io::FileIO;
24+
use iceberg::table::StaticTable;
25+
use iceberg::TableIdent;
26+
use iceberg_datafusion::table::IcebergTableProvider;
27+
use pyo3::exceptions::PyRuntimeError;
28+
use pyo3::prelude::*;
29+
use pyo3::types::PyCapsule;
30+
31+
use crate::runtime::runtime;
32+
33+
#[pyclass(name = "IcebergDataFusionTable")]
34+
pub struct PyIcebergDataFusionTable {
35+
inner: Arc<IcebergTableProvider>,
36+
}
37+
38+
#[pymethods]
39+
impl PyIcebergDataFusionTable {
40+
#[new]
41+
fn new(
42+
identifier: Vec<String>,
43+
metadata_location: String,
44+
file_io_properties: Option<HashMap<String, String>>,
45+
) -> PyResult<Self> {
46+
let runtime = runtime();
47+
48+
let provider = runtime.block_on(async {
49+
let table_ident = TableIdent::from_strs(identifier)
50+
.map_err(|e| PyRuntimeError::new_err(format!("Invalid table identifier: {e}")))?;
51+
52+
let mut builder = FileIO::from_path(&metadata_location)
53+
.map_err(|e| PyRuntimeError::new_err(format!("Failed to init FileIO: {e}")))?;
54+
55+
if let Some(props) = file_io_properties {
56+
builder = builder.with_props(props);
57+
}
58+
59+
let file_io = builder
60+
.build()
61+
.map_err(|e| PyRuntimeError::new_err(format!("Failed to build FileIO: {e}")))?;
62+
63+
let static_table =
64+
StaticTable::from_metadata_file(&metadata_location, table_ident, file_io)
65+
.await
66+
.map_err(|e| {
67+
PyRuntimeError::new_err(format!("Failed to load static table: {e}"))
68+
})?;
69+
70+
let table = static_table.into_table();
71+
72+
IcebergTableProvider::try_new_from_table(table)
73+
.await
74+
.map_err(|e| {
75+
PyRuntimeError::new_err(format!("Failed to create table provider: {e}"))
76+
})
77+
})?;
78+
79+
Ok(Self {
80+
inner: Arc::new(provider),
81+
})
82+
}
83+
84+
fn __datafusion_table_provider__<'py>(
85+
&self,
86+
py: Python<'py>,
87+
) -> PyResult<Bound<'py, PyCapsule>> {
88+
let capsule_name = CString::new("datafusion_table_provider").unwrap();
89+
90+
let ffi_provider = FFI_TableProvider::new(self.inner.clone(), false, Some(runtime()));
91+
92+
PyCapsule::new(py, ffi_provider, Some(capsule_name))
93+
}
94+
}
95+
96+
pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
97+
let this = PyModule::new(py, "datafusion")?;
98+
99+
this.add_class::<PyIcebergDataFusionTable>()?;
100+
101+
m.add_submodule(&this)?;
102+
py.import("sys")?
103+
.getattr("modules")?
104+
.set_item("pyiceberg_core.datafusion", this)?;
105+
106+
Ok(())
107+
}

bindings/python/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
use pyo3::prelude::*;
1919

20+
mod datafusion_table_provider;
2021
mod error;
22+
mod runtime;
2123
mod transform;
2224
mod manifest;
2325

2426
#[pymodule]
2527
fn pyiceberg_core_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
28+
datafusion_table_provider::register_module(py, m)?;
2629
transform::register_module(py, m)?;
2730
manifest::register_module(py, m)?;
2831
Ok(())

bindings/python/src/runtime.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::OnceLock;
19+
20+
use tokio::runtime::{Handle, Runtime};
21+
22+
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
23+
24+
pub fn runtime() -> Handle {
25+
match Handle::try_current() {
26+
Ok(h) => h.clone(),
27+
_ => {
28+
let rt = RUNTIME.get_or_init(|| Runtime::new().unwrap());
29+
rt.handle().clone()
30+
}
31+
}
32+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
from datetime import date, datetime
20+
import uuid
21+
import pytest
22+
from pyiceberg_core.datafusion import IcebergDataFusionTable
23+
from datafusion import SessionContext
24+
from pyiceberg.catalog import Catalog, load_catalog
25+
import pyarrow as pa
26+
from pathlib import Path
27+
import datafusion
28+
29+
assert (
30+
datafusion.__version__ >= "45"
31+
) # iceberg table provider only works for datafusion >= 45
32+
33+
34+
@pytest.fixture(scope="session")
35+
def warehouse(tmp_path_factory: pytest.TempPathFactory) -> Path:
36+
return tmp_path_factory.mktemp("warehouse")
37+
38+
39+
@pytest.fixture(scope="session")
40+
def catalog(warehouse: Path) -> Catalog:
41+
catalog = load_catalog(
42+
"default",
43+
**{
44+
"uri": f"sqlite:///{warehouse}/pyiceberg_catalog.db",
45+
"warehouse": f"file://{warehouse}",
46+
},
47+
)
48+
return catalog
49+
50+
51+
@pytest.fixture(scope="session")
52+
def arrow_table_with_null() -> "pa.Table":
53+
"""Pyarrow table with all kinds of columns."""
54+
import pyarrow as pa
55+
56+
return pa.Table.from_pydict(
57+
{
58+
"bool": [False, None, True],
59+
"string": ["a", None, "z"],
60+
# Go over the 16 bytes to kick in truncation
61+
"string_long": ["a" * 22, None, "z" * 22],
62+
"int": [1, None, 9],
63+
"long": [1, None, 9],
64+
"float": [0.0, None, 0.9],
65+
"double": [0.0, None, 0.9],
66+
# 'time': [1_000_000, None, 3_000_000], # Example times: 1s, none, and 3s past midnight #Spark does not support time fields
67+
"timestamp": [
68+
datetime(2023, 1, 1, 19, 25, 00),
69+
None,
70+
datetime(2023, 3, 1, 19, 25, 00),
71+
],
72+
# "timestamptz": [
73+
# datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
74+
# None,
75+
# datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
76+
# ],
77+
"date": [date(2023, 1, 1), None, date(2023, 3, 1)],
78+
# Not supported by Spark
79+
# 'time': [time(1, 22, 0), None, time(19, 25, 0)],
80+
# Not natively supported by Arrow
81+
# 'uuid': [uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None, uuid.UUID('11111111-1111-1111-1111-111111111111').bytes],
82+
"binary": [b"\01", None, b"\22"],
83+
"fixed": [
84+
uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
85+
None,
86+
uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
87+
],
88+
},
89+
)
90+
91+
92+
def test_register_iceberg_table_provider(
93+
catalog: Catalog, arrow_table_with_null: pa.Table
94+
) -> None:
95+
catalog.create_namespace_if_not_exists("default")
96+
iceberg_table = catalog.create_table_if_not_exists(
97+
"default.dataset",
98+
schema=arrow_table_with_null.schema,
99+
)
100+
iceberg_table.append(arrow_table_with_null)
101+
102+
iceberg_table_provider = IcebergDataFusionTable(
103+
identifier=iceberg_table.name(),
104+
metadata_location=iceberg_table.metadata_location,
105+
file_io_properties=iceberg_table.io.properties,
106+
)
107+
108+
ctx = SessionContext()
109+
ctx.register_table_provider("test", iceberg_table_provider)
110+
111+
datafusion_table = ctx.table("test")
112+
assert datafusion_table is not None
113+
114+
# check that the schema is the same
115+
from pyiceberg.io.pyarrow import _pyarrow_schema_ensure_small_types
116+
117+
assert _pyarrow_schema_ensure_small_types(
118+
datafusion_table.schema()
119+
) == _pyarrow_schema_ensure_small_types(iceberg_table.schema().as_arrow())
120+
# large/small type mismatches, fixed in pyiceberg 0.10.0
121+
# assert datafusion_table.schema() == iceberg_table.schema().as_arrow()
122+
123+
# check that the data is the same
124+
assert (
125+
datafusion_table.to_arrow_table().to_pylist()
126+
== iceberg_table.scan().to_arrow().to_pylist()
127+
)
128+
# large/small type mismatches, fixed in pyiceberg 0.10.0
129+
# assert datafusion_table.to_arrow_table() == iceberg_table.scan().to_arrow()
130+
131+
132+
def test_register_pyiceberg_table(
133+
catalog: Catalog, arrow_table_with_null: pa.Table
134+
) -> None:
135+
from types import MethodType
136+
137+
catalog.create_namespace_if_not_exists("default")
138+
iceberg_table = catalog.create_table_if_not_exists(
139+
"default.dataset",
140+
schema=arrow_table_with_null.schema,
141+
)
142+
iceberg_table.append(arrow_table_with_null)
143+
144+
# monkey patch the __datafusion_table_provider__ method to the iceberg table
145+
def __datafusion_table_provider__(self):
146+
return IcebergDataFusionTable(
147+
identifier=self.name(),
148+
metadata_location=self.metadata_location,
149+
file_io_properties=self.io.properties,
150+
).__datafusion_table_provider__()
151+
152+
iceberg_table.__datafusion_table_provider__ = MethodType(
153+
__datafusion_table_provider__, iceberg_table
154+
)
155+
156+
ctx = SessionContext()
157+
ctx.register_table_provider("test", iceberg_table)
158+
159+
datafusion_table = ctx.table("test")
160+
assert datafusion_table is not None

crates/catalog/glue/src/catalog.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ impl Catalog for GlueCatalog {
172172
///
173173
/// - Errors from `validate_namespace` if the namespace identifier does not
174174
/// meet validation criteria.
175-
/// - Errors from `convert_to_database` if the properties cannot be
175+
/// - Errors from `convert_to_database` if the properties cannot be
176176
/// successfully converted into a database configuration.
177177
/// - Errors from the underlying database creation process, converted using
178178
/// `from_sdk_error`.
@@ -259,7 +259,7 @@ impl Catalog for GlueCatalog {
259259
/// Asynchronously updates properties of an existing namespace.
260260
///
261261
/// Converts the given namespace identifier and properties into a database
262-
/// representation and then attempts to update the corresponding namespace
262+
/// representation and then attempts to update the corresponding namespace
263263
/// in the Glue Catalog.
264264
///
265265
/// # Returns
@@ -295,7 +295,7 @@ impl Catalog for GlueCatalog {
295295
/// # Returns
296296
/// A `Result<()>` indicating the outcome:
297297
/// - `Ok(())` signifies successful namespace deletion.
298-
/// - `Err(...)` signifies failure to drop the namespace due to validation
298+
/// - `Err(...)` signifies failure to drop the namespace due to validation
299299
/// errors, connectivity issues, or Glue Catalog constraints.
300300
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
301301
let db_name = validate_namespace(namespace)?;
@@ -322,7 +322,7 @@ impl Catalog for GlueCatalog {
322322
/// A `Result<Vec<TableIdent>>`, which is:
323323
/// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
324324
/// representing a table within the specified namespace.
325-
/// - `Err(...)` if an error occurs during namespace validation or while
325+
/// - `Err(...)` if an error occurs during namespace validation or while
326326
/// querying the database.
327327
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
328328
let db_name = validate_namespace(namespace)?;
@@ -375,7 +375,7 @@ impl Catalog for GlueCatalog {
375375
async fn create_table(
376376
&self,
377377
namespace: &NamespaceIdent,
378-
creation: TableCreation,
378+
mut creation: TableCreation,
379379
) -> Result<Table> {
380380
let db_name = validate_namespace(namespace)?;
381381
let table_name = creation.name.clone();
@@ -384,10 +384,12 @@ impl Catalog for GlueCatalog {
384384
Some(location) => location.clone(),
385385
None => {
386386
let ns = self.get_namespace(namespace).await?;
387-
get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse)
387+
let location =
388+
get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
389+
creation.location = Some(location.clone());
390+
location
388391
}
389392
};
390-
391393
let metadata = TableMetadataBuilder::from_table_creation(creation)?
392394
.build()?
393395
.metadata;

0 commit comments

Comments
 (0)