diff --git a/Cargo.toml b/Cargo.toml index cae0c85cb..a419103d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ [workspace] exclude = ["bindings/python"] members = [ + "crates/benchmarks", "crates/catalog/*", "crates/examples", "crates/iceberg", @@ -76,6 +77,7 @@ http = "1.2" iceberg = { version = "0.5.1", path = "./crates/iceberg" } iceberg-catalog-memory = { version = "0.5.1", path = "./crates/catalog/memory" } iceberg-catalog-rest = { version = "0.5.1", path = "./crates/catalog/rest" } +iceberg-catalog-sql = { version = "0.5.1", path = "./crates/catalog/sql" } iceberg-datafusion = { version = "0.5.1", path = "./crates/integrations/datafusion" } indicatif = "0.17" itertools = "0.13" diff --git a/crates/benchmarks/.gitignore b/crates/benchmarks/.gitignore new file mode 100644 index 000000000..88d0f7e05 --- /dev/null +++ b/crates/benchmarks/.gitignore @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +src/construction_scripts/__pycache__/ diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml new file mode 100644 index 000000000..2702e7246 --- /dev/null +++ b/crates/benchmarks/Cargo.toml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "benchmarks" +edition.workspace = true +homepage.workspace = true +version.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +iceberg = { workspace = true } +iceberg-catalog-sql = { workspace = true } +rand = { workspace = true } +walkdir = "2.5.0" +tokio = { features = ["rt-multi-thread"], workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +arrow-array.workspace = true +sqlx = { version = "0.8.1", features = ["any", "runtime-tokio", "sqlite"] } + +[dev-dependencies] +criterion = { version = "0.6.0", features = ["async_tokio"] } + +[[bench]] +name = "sql-catalog-projection-once" +harness = false + +[lints.rust] +# so the #[cfg(benchmarking)] tag doesn't create warnings +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(benchmarking)'] } + +# this is done so we can get sqlite feature support without a warning +[package.metadata.cargo-machete] +ignored = ["sqlx"] diff --git a/crates/benchmarks/README.md b/crates/benchmarks/README.md new file mode 100644 index 000000000..624ad115b --- /dev/null +++ b/crates/benchmarks/README.md @@ -0,0 +1,26 @@ + + +Where `iceberg-rust` benchmarks are kept. + +To run the benchmarks, we must pass the `benchmarking` rustflag so that the benchmarking storage is compiled and we can use a storage with a predictable and reliable latency. + +This command includes it: `RUSTFLAGS="--cfg benchmarking" cargo bench` + +## Existing benchmarks + +- `sql-catalog-projection-once`: Benchmarks scanning a table (we use a month of NYC taxicab data) and projecting two columns. diff --git a/crates/benchmarks/benches/sql-catalog-projection-once.rs b/crates/benchmarks/benches/sql-catalog-projection-once.rs new file mode 100644 index 000000000..8ee3c0c71 --- /dev/null +++ b/crates/benchmarks/benches/sql-catalog-projection-once.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint::black_box; +use std::path::PathBuf; +use std::time::{Duration, Instant}; + +use arrow_array::RecordBatch; +use benchmarks::{construct_table, copy_dir_to_fileio}; +use criterion::{Criterion, criterion_group, criterion_main}; +use futures::TryStreamExt; +use iceberg::io::FileIOBuilder; +use iceberg::table::Table; +use iceberg::{Catalog, TableIdent}; +use iceberg_catalog_sql::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; +use tokio::runtime::Runtime; + +#[allow(unused_variables)] +pub fn bench_sql_catalog_projection_once(c: &mut Criterion) { + #[cfg(not(benchmarking))] + { + eprintln!(r#"benchmarking flag not enabled, must pass RUSTFLAGS="--cfg benchmarking""#); + return; + } + + #[allow(unreachable_code)] + let mut group = c.benchmark_group("sql_catalog_projection_once"); + group + .measurement_time(Duration::from_secs(20)) + .sample_size(50); + + let table_dir = construct_table("sql-catalog-taxicab"); + let mut db_path = table_dir.clone(); + db_path.push("benchmarking-catalog.db"); + let uri = format!("sqlite:{}", db_path.to_str().unwrap()); + + group.bench_function( + "sql_catalog_projection_once", + // an issue with criterion (https://github.com/bheisler/criterion.rs/issues/751) means we can't do a normal benchmark here, + // it doesn't let use provide an async setup function to build `FileIO`, etc. instead, we have to use `iter_custom` and do + // the measurements ourselves + |b| { + b.to_async(Runtime::new().unwrap()).iter_custom(async |n| { + let mut total_elapsed = Duration::default(); + + for _ in 0..n { + let table = setup_table(table_dir.clone(), uri.clone()).await; + + let start = Instant::now(); + let output = scan_table(black_box(table)).await; + let dur = start.elapsed(); + + drop(black_box(output)); + total_elapsed += dur; + } + + total_elapsed + }) + }, + ); + group.finish() +} + +async fn setup_table(table_dir: PathBuf, uri: String) -> Table { + let file_io = FileIOBuilder::new("iceberg_benchmarking_storage") + .build() + .unwrap(); + copy_dir_to_fileio(table_dir.clone(), &file_io).await; + + let config = SqlCatalogConfig::builder() + .file_io(file_io) + .uri(format!("sqlite:{uri}")) + .name("default".to_owned()) + .sql_bind_style(SqlBindStyle::QMark) + .warehouse_location(table_dir.to_str().unwrap().to_owned()) + .build(); + let catalog = SqlCatalog::new(config).await.unwrap(); + catalog + .load_table(&TableIdent::from_strs(["default", "taxi_dataset"]).unwrap()) + .await + .unwrap() +} + +async fn scan_table(table: Table) -> Vec { + let stream = table + .scan() + .select(["passenger_count", "fare_amount"]) + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + + stream.try_collect().await.unwrap() +} + +criterion_group!(benches, bench_sql_catalog_projection_once); +criterion_main!(benches); diff --git a/crates/benchmarks/src/construction_scripts/requirements.txt b/crates/benchmarks/src/construction_scripts/requirements.txt new file mode 100644 index 000000000..07227ec69 --- /dev/null +++ b/crates/benchmarks/src/construction_scripts/requirements.txt @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +pyiceberg[sql-sqlite] +pyarrow diff --git a/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py b/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py new file mode 100644 index 000000000..c03900d64 --- /dev/null +++ b/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# reason for existence explained in ../lib.rs + +from pathlib import Path +from sys import argv +from urllib.request import urlretrieve +import sqlite3 + +from pyiceberg.catalog import load_catalog +from pyarrow import parquet + +working_dir = Path(argv[1]) + +warehouse_path = working_dir +catalog = load_catalog( + "default", + **{ + 'type': 'sql', + "uri": f"sqlite:///{warehouse_path}/benchmarking-catalog.db", + "warehouse": f"file://{warehouse_path}", + }, +) + +data_file_path = working_dir / "yellow_tripdata_2023-01.parquet" +urlretrieve("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet", data_file_path) + +df = parquet.read_table(data_file_path) + +catalog.create_namespace("default") + +table = catalog.create_table( + "default.taxi_dataset", + schema=df.schema, +) + +table.append(df) + + +# the rust sql catalog needs an extra column for each table that pyiceberg doesn't include, this adds it +conn = sqlite3.connect(f"{warehouse_path}/benchmarking-catalog.db") +c = conn.cursor() +c.execute("ALTER TABLE iceberg_tables ADD column iceberg_type VARCHAR(5) DEFAULT 'TABLE'") +conn.commit() +c.close() diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs new file mode 100644 index 000000000..a6f332ad2 --- /dev/null +++ b/crates/benchmarks/src/lib.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::env::{set_current_dir, temp_dir}; +use std::fs::{create_dir_all, read}; +use std::path::PathBuf; +use std::process; +use std::str::FromStr; + +use bytes::Bytes; +use iceberg::io::FileIO; +use rand::{RngCore, thread_rng}; +use walkdir::WalkDir; + +/// As of now, write support for tables is not complete for iceberg rust, so we can't construct +/// tables for benchmarking ourselves. +/// +/// As a temporary (and admittedly ugly) measure, we run a python script to generate the tables +/// in a temp directory instead and then migrate all files created to a memory-storage `FileIO`. +/// When write support is complete, we can easily swap out the Python scripts and do this natively. +pub fn construct_table(table_name: &str) -> PathBuf { + let script_filename = [table_name, "py"].join("."); + let mut script_path = PathBuf::from_str(env!("CARGO_MANIFEST_DIR")).unwrap(); + script_path.push("src"); + script_path.push("construction_scripts"); + + set_current_dir(&script_path).unwrap(); + + script_path.push(script_filename); + + // should look like /tmp/iceberg_benchmark_tables/-> + let mut working_dir = temp_dir(); + working_dir.push("iceberg_benchmark_tables"); + working_dir.push([table_name, &thread_rng().next_u64().to_string()].join("-")); + + create_dir_all(&working_dir).unwrap(); + + process::Command::new("python") + .arg(script_path.to_str().unwrap()) + .arg(working_dir.to_str().unwrap()) + .spawn() + .unwrap() + .wait() + .unwrap(); + + working_dir +} + +pub async fn copy_dir_to_fileio(dir: PathBuf, fileio: &FileIO) { + for entry in WalkDir::new(dir) + .into_iter() + .map(|res| res.unwrap()) + .filter(|entry| entry.file_type().is_file()) + { + let path = entry.into_path(); + let output = fileio.new_output(path.to_str().unwrap()).unwrap(); + let bytes = Bytes::from(read(path).unwrap()); + output.write(bytes).await.unwrap(); + } +} diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d1ddc8246..d7e1a6707 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -100,3 +100,7 @@ tera = { workspace = true } [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version ignored = ["tap"] + +[lints.rust] +# so the #[cfg(benchmarking)] tag doesn't create warnings +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(benchmarking)'] } diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 5eb596434..cb17f3f46 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -74,6 +74,8 @@ pub(crate) mod object_cache; #[cfg(feature = "storage-azdls")] mod storage_azdls; +#[cfg(benchmarking)] +mod storage_benchmarking; #[cfg(feature = "storage-fs")] mod storage_fs; #[cfg(feature = "storage-gcs")] @@ -87,6 +89,8 @@ mod storage_s3; #[cfg(feature = "storage-azdls")] pub use storage_azdls::*; +#[cfg(benchmarking)] +use storage_benchmarking::*; #[cfg(feature = "storage-fs")] use storage_fs::*; #[cfg(feature = "storage-gcs")] diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index a847977e5..45e445f83 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -62,12 +62,20 @@ pub(crate) enum Storage { configured_scheme: AzureStorageScheme, config: Arc, }, + #[cfg(benchmarking)] + Benchmarking(Operator), } impl Storage { /// Convert iceberg config to opendal config. pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result { let (scheme_str, props) = file_io_builder.into_parts(); + + #[cfg(benchmarking)] + if scheme_str == "iceberg_benchmarking_storage" { + return Ok(Self::Benchmarking(super::benchmarking_config_build()?)); + } + let scheme = Self::parse_scheme(&scheme_str)?; match scheme { @@ -192,6 +200,14 @@ impl Storage { configured_scheme, config, } => super::azdls_create_operator(path, config, configured_scheme), + #[cfg(benchmarking)] + Storage::Benchmarking(op) => { + if let Some(stripped) = path.split_once(":") { + Ok::<_, crate::Error>((op.clone(), stripped.1)) + } else { + Ok::<_, crate::Error>((op.clone(), path)) + } + } #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), diff --git a/crates/iceberg/src/io/storage_benchmarking.rs b/crates/iceberg/src/io/storage_benchmarking.rs new file mode 100644 index 000000000..cfa2e6705 --- /dev/null +++ b/crates/iceberg/src/io/storage_benchmarking.rs @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! It's challenging to make storage with reproducible delays for benchmarks. Variance in performance can +//! tamper with results. Additionally, benchmarks can benefit from granular control of exactly how long +//! the latency will be. +//! +//! To solve this problem, we have an extra **usually not included** storage type for benchmarks. This is +//! almost exactly the same as the memory catalog, except with a preset latency with each read and write. +//! +//! THIS CODE SHOULD NOT AND IS NOT INCLUDED IN REGULAR DISTRIBUTIONS OF `iceberg-rs`. To include it, +//! you must pass RUSTFLAGS="--cfg benchmarking" to `cargo`. + +use std::thread; +use std::time::Duration; + +use opendal::raw::{ + Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, +}; +use opendal::services::MemoryConfig; +use opendal::{Operator, Result}; +use rand::*; +use tokio::time::sleep; + +pub(crate) fn benchmarking_config_build() -> Result { + Ok(Operator::from_config(MemoryConfig::default())? + .layer(DelayLayer) + .finish()) +} + +/// A layer that artifially introduces predictable relay for better benchmarking +struct DelayLayer; + +impl Layer for DelayLayer { + type LayeredAccess = DelayedAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + DelayedAccessor { inner } + } +} + +#[derive(Debug)] +struct DelayedAccessor { + inner: A, +} + +impl LayeredAccess for DelayedAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + type Deleter = A::Deleter; + type BlockingDeleter = A::BlockingDeleter; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + sleep(Duration::from_millis(20)).await; + + self.inner.read(path, args).await + } + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + thread::sleep(Duration::from_millis(20)); + + self.inner.blocking_read(path, args) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + sleep(Duration::from_millis(20)).await; + + self.inner.write(path, args).await + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + thread::sleep(Duration::from_millis(20)); + + self.inner.blocking_write(path, args) + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + self.inner.list(path, args).await + } + + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { + self.inner.blocking_list(path, args) + } + + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + self.inner.delete().await + } + + fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> { + self.inner.blocking_delete() + } +}