From cb2394d45f199823016ba25771364788ab23a308 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 19 Jun 2025 15:52:46 -0700 Subject: [PATCH 01/25] feat: flag-gated benchmarking storage for fake latency --- crates/iceberg/Cargo.toml | 4 + crates/iceberg/src/io/mod.rs | 4 + crates/iceberg/src/io/storage.rs | 16 +++ crates/iceberg/src/io/storage_benchmarking.rs | 119 ++++++++++++++++++ 4 files changed, 143 insertions(+) create mode 100644 crates/iceberg/src/io/storage_benchmarking.rs diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d1ddc8246..d7e1a6707 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -100,3 +100,7 @@ tera = { workspace = true } [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version ignored = ["tap"] + +[lints.rust] +# so the #[cfg(benchmarking)] tag doesn't create warnings +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(benchmarking)'] } diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 5eb596434..aa1fa944f 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -84,6 +84,8 @@ mod storage_memory; mod storage_oss; #[cfg(feature = "storage-s3")] mod storage_s3; +#[cfg(benchmarking)] +mod storage_benchmarking; #[cfg(feature = "storage-azdls")] pub use storage_azdls::*; @@ -97,6 +99,8 @@ use storage_memory::*; pub use storage_oss::*; #[cfg(feature = "storage-s3")] pub use storage_s3::*; +#[cfg(benchmarking)] +use storage_benchmarking::*; pub(crate) fn is_truthy(value: &str) -> bool { ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str()) diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index a847977e5..63fa7e41a 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -62,12 +62,20 @@ pub(crate) enum Storage { configured_scheme: AzureStorageScheme, config: Arc, }, + #[cfg(benchmarking)] + Benchmarking(Operator) } impl Storage { /// Convert iceberg config to opendal config. pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result { let (scheme_str, props) = file_io_builder.into_parts(); + + #[cfg(benchmarking)] + if scheme_str == "iceberg_benchmarking_storage" { + return Ok(Self::Benchmarking(super::benchmarking_config_build()?)); + } + let scheme = Self::parse_scheme(&scheme_str)?; match scheme { @@ -192,6 +200,14 @@ impl Storage { configured_scheme, config, } => super::azdls_create_operator(path, config, configured_scheme), + #[cfg(benchmarking)] + Storage::Benchmarking(op) => { + if let Some(stripped) = path.split_once(":") { + Ok::<_, crate::Error>((op.clone(), stripped.1)) + } else { + Ok::<_, crate::Error>((op.clone(), path)) + } + } #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), diff --git a/crates/iceberg/src/io/storage_benchmarking.rs b/crates/iceberg/src/io/storage_benchmarking.rs new file mode 100644 index 000000000..394ea75f7 --- /dev/null +++ b/crates/iceberg/src/io/storage_benchmarking.rs @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This storage is used to mimick consistent latency for benchmarks of iceberg. +//! It should not be included in standard distributions of iceberg. + +use std::thread; +use std::time::Duration; + +use opendal::raw::{Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite}; +use opendal::Operator; +use opendal::services::MemoryConfig; +use opendal::Result; +use rand::{thread_rng, Rng}; +use tokio::time::sleep; + +pub(crate) fn benchmarking_config_build() -> Result { + Ok(Operator::from_config(MemoryConfig::default())?.layer(DelayLayer).finish()) +} + +/// Usually takes around 50 ms, to visualize function: $ f\left(x\right)=x^{-0.5}\ \cdot\ 0.10 $ +fn gen_amt_latency() -> Duration { + let x: f64 = thread_rng().gen_range(0.01..10.); + Duration::from_secs_f64(x.powf(-0.5) * 0.1) +} + +/// A layer that artifially introduces predictable relay for better benchmarking +struct DelayLayer; + +impl Layer for DelayLayer { + type LayeredAccess = DelayedAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + DelayedAccessor { inner } + } +} + +#[derive(Debug)] +struct DelayedAccessor { + inner: A +} + +impl LayeredAccess for DelayedAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + type Deleter = A::Deleter; + type BlockingDeleter = A::BlockingDeleter; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + sleep(gen_amt_latency()).await; + + self.inner.read(path, args).await + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> Result<(RpRead, Self::BlockingReader)> { + thread::sleep(gen_amt_latency()); + + self.inner.blocking_read(path, args) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + self.inner.write(path, args).await + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> Result<(RpWrite, Self::BlockingWriter)> { + self.inner.blocking_write(path, args) + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + self.inner.list(path, args).await + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, Self::BlockingLister)> { + self.inner.blocking_list(path, args) + } + + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + self.inner.delete().await + } + + fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> { + self.inner.blocking_delete() + } +} From af20743318f4b61b4eaa1e9b75f32a8040500522 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 19 Jun 2025 15:53:34 -0700 Subject: [PATCH 02/25] feat: first benchmark --- Cargo.toml | 2 + crates/benchmarks/.gitignore | 18 ++++ crates/benchmarks/Cargo.toml | 42 +++++++++ crates/benchmarks/README.md | 22 +++++ .../benches/sql-catalog-querying-taxicab.rs | 87 +++++++++++++++++++ .../src/construction_scripts/README.md | 24 +++++ .../sql-catalog-taxicab.py | 56 ++++++++++++ crates/benchmarks/src/lib.rs | 66 ++++++++++++++ 8 files changed, 317 insertions(+) create mode 100644 crates/benchmarks/.gitignore create mode 100644 crates/benchmarks/Cargo.toml create mode 100644 crates/benchmarks/README.md create mode 100644 crates/benchmarks/benches/sql-catalog-querying-taxicab.rs create mode 100644 crates/benchmarks/src/construction_scripts/README.md create mode 100644 crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py create mode 100644 crates/benchmarks/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index cae0c85cb..a419103d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ [workspace] exclude = ["bindings/python"] members = [ + "crates/benchmarks", "crates/catalog/*", "crates/examples", "crates/iceberg", @@ -76,6 +77,7 @@ http = "1.2" iceberg = { version = "0.5.1", path = "./crates/iceberg" } iceberg-catalog-memory = { version = "0.5.1", path = "./crates/catalog/memory" } iceberg-catalog-rest = { version = "0.5.1", path = "./crates/catalog/rest" } +iceberg-catalog-sql = { version = "0.5.1", path = "./crates/catalog/sql" } iceberg-datafusion = { version = "0.5.1", path = "./crates/integrations/datafusion" } indicatif = "0.17" itertools = "0.13" diff --git a/crates/benchmarks/.gitignore b/crates/benchmarks/.gitignore new file mode 100644 index 000000000..289e5649b --- /dev/null +++ b/crates/benchmarks/.gitignore @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +src/construction_scripts/__pycache__/ \ No newline at end of file diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml new file mode 100644 index 000000000..b65ae76e3 --- /dev/null +++ b/crates/benchmarks/Cargo.toml @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "benches" +edition.workspace = true +homepage.workspace = true +version.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +iceberg = { workspace = true } +iceberg-catalog-sql = { workspace = true } +rand = { workspace = true } +walkdir = "2.5.0" +tokio = { features = ["rt-multi-thread"], workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +arrow-array.workspace = true + +[dev-dependencies] +criterion = { version = "0.6.0", features = ["async_tokio"] } + +[[bench]] +name = "sql-catalog-querying-taxicab" +harness = false diff --git a/crates/benchmarks/README.md b/crates/benchmarks/README.md new file mode 100644 index 000000000..6f8c5d278 --- /dev/null +++ b/crates/benchmarks/README.md @@ -0,0 +1,22 @@ + + +Where `iceberg-rust` benchmarks are kept. + +To run the benchmarks, we must pass the `benchmarking` rustflag so that the benchmarking storage is compiled and we can tamper with fileio latency. + +This command includes it: `RUSTFLAGS="--cfg benchmarking" cargo bench` \ No newline at end of file diff --git a/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs b/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs new file mode 100644 index 000000000..ab9a96c90 --- /dev/null +++ b/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{hint::black_box, path::PathBuf, sync::Arc, time::{Duration, Instant}}; +use arrow_array::RecordBatch; +use benches::{copy_dir_to_fileio, run_construction_script}; +use criterion::{criterion_group, criterion_main, Criterion}; +use futures::TryStreamExt; +use iceberg::{io::{FileIO, FileIOBuilder}, table::Table, Catalog, NamespaceIdent, TableIdent}; +use iceberg_catalog_sql::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; +use tokio::runtime::Runtime; + +pub fn bench_1_taxicab_query(c: &mut Criterion) { + let mut group = c.benchmark_group("single_read_taxicab_sql_catalog"); + group.measurement_time(Duration::from_secs(25)); + + let table_dir = run_construction_script("sql-catalog-taxicab"); + let mut db_path = table_dir.clone(); + db_path.push("benchmarking-catalog.db"); + let uri = format!("sqlite:{}", db_path.to_str().unwrap()); + + group.bench_function( + "single_read_taxicab_sql_catalog", + // iter custom is not ideal, but criterion doesn't let us have a custom async setup which is really annoying + |b| b.to_async(Runtime::new().unwrap()).iter_custom( + async |_| { + let table = setup_table(table_dir.clone(), uri.clone()).await; + + let start = Instant::now(); + let output = scan_table(black_box(table)).await; + let end = Instant::now(); + + drop(black_box(output)); + + end - start + }, + ) + ); + group.finish() +} + +async fn setup_table(table_dir: PathBuf, uri: String) -> Table { + let file_io = FileIOBuilder::new("iceberg_benchmarking_storage").build().unwrap(); + copy_dir_to_fileio(table_dir.clone(), &file_io).await; + + let config = SqlCatalogConfig::builder() + .file_io(file_io) + .uri(format!("sqlite:{uri}")) + .name("default".to_owned()) + .sql_bind_style(SqlBindStyle::QMark) + .warehouse_location(table_dir.to_str().unwrap().to_owned()) + .build(); + let catalog = SqlCatalog::new(config).await.unwrap(); + let table = catalog.load_table(&TableIdent::from_strs(["default", "taxi_dataset"]).unwrap()) + .await + .expect(&format!("table_dir: {table_dir:?}")); + table +} + +async fn scan_table(table: Table) -> Vec { + let stream = table.scan() + .select(["passenger_count", "fare_amount"]) + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + + stream.try_collect().await.unwrap() +} + +criterion_group!(benches, bench_1_taxicab_query); +criterion_main!(benches); diff --git a/crates/benchmarks/src/construction_scripts/README.md b/crates/benchmarks/src/construction_scripts/README.md new file mode 100644 index 000000000..9d3b7ab2d --- /dev/null +++ b/crates/benchmarks/src/construction_scripts/README.md @@ -0,0 +1,24 @@ + + +This is a collection of python scripts for generating iceberg catalogs and tables. + +When transaction support is done and we can do this ourselves, these should be migrated to iceberg-rs. + +python dependancies: +pyiceberg[sql-sqlite] +pyarrow \ No newline at end of file diff --git a/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py b/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py new file mode 100644 index 000000000..2d9499bcd --- /dev/null +++ b/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pathlib import Path +from sys import argv +from urllib.request import urlretrieve +import sqlite3 + +from pyiceberg.catalog import load_catalog +from pyarrow import parquet + +working_dir = Path(argv[1]) + +warehouse_path = working_dir +catalog = load_catalog( + "default", + **{ + 'type': 'sql', + "uri": f"sqlite:///{warehouse_path}/benchmarking-catalog.db", + "warehouse": f"file://{warehouse_path}", + }, +) + +data_file_path = working_dir / "yellow_tripdata_2023-01.parquet" +urlretrieve("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet", data_file_path) + +df = parquet.read_table(data_file_path) + +catalog.create_namespace("default") + +table = catalog.create_table( + "default.taxi_dataset", + schema=df.schema, +) + +table.append(df) + +conn = sqlite3.connect(f"{warehouse_path}/benchmarking-catalog.db") +c = conn.cursor() +c.execute("ALTER TABLE iceberg_tables ADD column iceberg_type VARCHAR(5) DEFAULT 'TABLE'") +conn.commit() +c.close() diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs new file mode 100644 index 000000000..c530e7bc3 --- /dev/null +++ b/crates/benchmarks/src/lib.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{env::{set_current_dir, temp_dir}, fs::{create_dir_all, read}, path::PathBuf, process, str::FromStr}; + +use bytes::Bytes; +use iceberg::io::FileIO; +use rand::{thread_rng, RngCore}; +use tokio::runtime::Runtime; +use walkdir::WalkDir; + +/// runs the python construction script, provided just the file name without .py +/// +/// ("sql-catalog-querying-taxicab" etc.) +pub fn run_construction_script(script_name: &str) -> PathBuf { + let script_filename = [script_name, "py"].join("."); + let mut script_path = PathBuf::from_str(env!("CARGO_MANIFEST_DIR")).unwrap(); + script_path.push("src"); + script_path.push("construction_scripts"); + + set_current_dir(&script_path).unwrap(); + + script_path.push(script_filename); + + // should look like /tmp/iceberg_benchmark_tables/-> + let mut working_dir = temp_dir(); + working_dir.push("iceberg_benchmark_tables"); + working_dir.push([script_name, &thread_rng().next_u64().to_string()].join("-")); + + create_dir_all(&working_dir).unwrap(); + + process::Command::new("python") + .arg(script_path.to_str().unwrap()) + .arg(working_dir.to_str().unwrap()) + .spawn() + .unwrap() + .wait() + .unwrap(); + + working_dir +} + +pub async fn copy_dir_to_fileio(dir: PathBuf, fileio: &FileIO) { + for entry in WalkDir::new(dir).into_iter() + .map(|res| res.unwrap()) + .filter(|entry| entry.file_type().is_file()) { + let path = entry.into_path(); + let output = fileio.new_output(path.to_str().unwrap()).unwrap(); + let bytes = Bytes::from(read(path).unwrap()); + output.write(bytes).await.unwrap(); + } +} From 5e5858ece2e67bf697f27142ac957a1fc685d31f Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 19 Jun 2025 15:54:58 -0700 Subject: [PATCH 03/25] chore: remove unused imports --- crates/benchmarks/benches/sql-catalog-querying-taxicab.rs | 4 ++-- crates/benchmarks/src/lib.rs | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs b/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs index ab9a96c90..041919af5 100644 --- a/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs +++ b/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{hint::black_box, path::PathBuf, sync::Arc, time::{Duration, Instant}}; +use std::{hint::black_box, path::PathBuf, time::{Duration, Instant}}; use arrow_array::RecordBatch; use benches::{copy_dir_to_fileio, run_construction_script}; use criterion::{criterion_group, criterion_main, Criterion}; use futures::TryStreamExt; -use iceberg::{io::{FileIO, FileIOBuilder}, table::Table, Catalog, NamespaceIdent, TableIdent}; +use iceberg::{io::FileIOBuilder, table::Table, Catalog, TableIdent}; use iceberg_catalog_sql::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; use tokio::runtime::Runtime; diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs index c530e7bc3..56845abc5 100644 --- a/crates/benchmarks/src/lib.rs +++ b/crates/benchmarks/src/lib.rs @@ -20,7 +20,6 @@ use std::{env::{set_current_dir, temp_dir}, fs::{create_dir_all, read}, path::Pa use bytes::Bytes; use iceberg::io::FileIO; use rand::{thread_rng, RngCore}; -use tokio::runtime::Runtime; use walkdir::WalkDir; /// runs the python construction script, provided just the file name without .py From 1d7916288a0b86a24ed9d63b957a6f77ed043869 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 19 Jun 2025 15:55:34 -0700 Subject: [PATCH 04/25] chore: cargo fmt --- .../benches/sql-catalog-querying-taxicab.rs | 29 ++++++++++++------- crates/benchmarks/src/lib.rs | 26 ++++++++++------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs b/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs index 041919af5..aacd9d066 100644 --- a/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs +++ b/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs @@ -15,12 +15,17 @@ // specific language governing permissions and limitations // under the License. -use std::{hint::black_box, path::PathBuf, time::{Duration, Instant}}; +use std::hint::black_box; +use std::path::PathBuf; +use std::time::{Duration, Instant}; + use arrow_array::RecordBatch; use benches::{copy_dir_to_fileio, run_construction_script}; -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::{Criterion, criterion_group, criterion_main}; use futures::TryStreamExt; -use iceberg::{io::FileIOBuilder, table::Table, Catalog, TableIdent}; +use iceberg::io::FileIOBuilder; +use iceberg::table::Table; +use iceberg::{Catalog, TableIdent}; use iceberg_catalog_sql::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; use tokio::runtime::Runtime; @@ -36,8 +41,8 @@ pub fn bench_1_taxicab_query(c: &mut Criterion) { group.bench_function( "single_read_taxicab_sql_catalog", // iter custom is not ideal, but criterion doesn't let us have a custom async setup which is really annoying - |b| b.to_async(Runtime::new().unwrap()).iter_custom( - async |_| { + |b| { + b.to_async(Runtime::new().unwrap()).iter_custom(async |_| { let table = setup_table(table_dir.clone(), uri.clone()).await; let start = Instant::now(); @@ -47,14 +52,16 @@ pub fn bench_1_taxicab_query(c: &mut Criterion) { drop(black_box(output)); end - start - }, - ) + }) + }, ); group.finish() } async fn setup_table(table_dir: PathBuf, uri: String) -> Table { - let file_io = FileIOBuilder::new("iceberg_benchmarking_storage").build().unwrap(); + let file_io = FileIOBuilder::new("iceberg_benchmarking_storage") + .build() + .unwrap(); copy_dir_to_fileio(table_dir.clone(), &file_io).await; let config = SqlCatalogConfig::builder() @@ -65,14 +72,16 @@ async fn setup_table(table_dir: PathBuf, uri: String) -> Table { .warehouse_location(table_dir.to_str().unwrap().to_owned()) .build(); let catalog = SqlCatalog::new(config).await.unwrap(); - let table = catalog.load_table(&TableIdent::from_strs(["default", "taxi_dataset"]).unwrap()) + let table = catalog + .load_table(&TableIdent::from_strs(["default", "taxi_dataset"]).unwrap()) .await .expect(&format!("table_dir: {table_dir:?}")); table } async fn scan_table(table: Table) -> Vec { - let stream = table.scan() + let stream = table + .scan() .select(["passenger_count", "fare_amount"]) .build() .unwrap() diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs index 56845abc5..e1a1c8a8f 100644 --- a/crates/benchmarks/src/lib.rs +++ b/crates/benchmarks/src/lib.rs @@ -15,15 +15,19 @@ // specific language governing permissions and limitations // under the License. -use std::{env::{set_current_dir, temp_dir}, fs::{create_dir_all, read}, path::PathBuf, process, str::FromStr}; +use std::env::{set_current_dir, temp_dir}; +use std::fs::{create_dir_all, read}; +use std::path::PathBuf; +use std::process; +use std::str::FromStr; use bytes::Bytes; use iceberg::io::FileIO; -use rand::{thread_rng, RngCore}; +use rand::{RngCore, thread_rng}; use walkdir::WalkDir; /// runs the python construction script, provided just the file name without .py -/// +/// /// ("sql-catalog-querying-taxicab" etc.) pub fn run_construction_script(script_name: &str) -> PathBuf { let script_filename = [script_name, "py"].join("."); @@ -54,12 +58,14 @@ pub fn run_construction_script(script_name: &str) -> PathBuf { } pub async fn copy_dir_to_fileio(dir: PathBuf, fileio: &FileIO) { - for entry in WalkDir::new(dir).into_iter() + for entry in WalkDir::new(dir) + .into_iter() .map(|res| res.unwrap()) - .filter(|entry| entry.file_type().is_file()) { - let path = entry.into_path(); - let output = fileio.new_output(path.to_str().unwrap()).unwrap(); - let bytes = Bytes::from(read(path).unwrap()); - output.write(bytes).await.unwrap(); - } + .filter(|entry| entry.file_type().is_file()) + { + let path = entry.into_path(); + let output = fileio.new_output(path.to_str().unwrap()).unwrap(); + let bytes = Bytes::from(read(path).unwrap()); + output.write(bytes).await.unwrap(); + } } From 4182bd820276f0362d764a66b79f276d4c04f714 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 19 Jun 2025 15:56:34 -0700 Subject: [PATCH 05/25] chore: cargo fmt --- crates/iceberg/src/io/mod.rs | 8 ++-- crates/iceberg/src/io/storage.rs | 4 +- crates/iceberg/src/io/storage_benchmarking.rs | 41 ++++++++----------- 3 files changed, 22 insertions(+), 31 deletions(-) diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index aa1fa944f..cb17f3f46 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -74,6 +74,8 @@ pub(crate) mod object_cache; #[cfg(feature = "storage-azdls")] mod storage_azdls; +#[cfg(benchmarking)] +mod storage_benchmarking; #[cfg(feature = "storage-fs")] mod storage_fs; #[cfg(feature = "storage-gcs")] @@ -84,11 +86,11 @@ mod storage_memory; mod storage_oss; #[cfg(feature = "storage-s3")] mod storage_s3; -#[cfg(benchmarking)] -mod storage_benchmarking; #[cfg(feature = "storage-azdls")] pub use storage_azdls::*; +#[cfg(benchmarking)] +use storage_benchmarking::*; #[cfg(feature = "storage-fs")] use storage_fs::*; #[cfg(feature = "storage-gcs")] @@ -99,8 +101,6 @@ use storage_memory::*; pub use storage_oss::*; #[cfg(feature = "storage-s3")] pub use storage_s3::*; -#[cfg(benchmarking)] -use storage_benchmarking::*; pub(crate) fn is_truthy(value: &str) -> bool { ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str()) diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 63fa7e41a..45e445f83 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -63,7 +63,7 @@ pub(crate) enum Storage { config: Arc, }, #[cfg(benchmarking)] - Benchmarking(Operator) + Benchmarking(Operator), } impl Storage { @@ -202,7 +202,7 @@ impl Storage { } => super::azdls_create_operator(path, config, configured_scheme), #[cfg(benchmarking)] Storage::Benchmarking(op) => { - if let Some(stripped) = path.split_once(":") { + if let Some(stripped) = path.split_once(":") { Ok::<_, crate::Error>((op.clone(), stripped.1)) } else { Ok::<_, crate::Error>((op.clone(), path)) diff --git a/crates/iceberg/src/io/storage_benchmarking.rs b/crates/iceberg/src/io/storage_benchmarking.rs index 394ea75f7..2dedc254e 100644 --- a/crates/iceberg/src/io/storage_benchmarking.rs +++ b/crates/iceberg/src/io/storage_benchmarking.rs @@ -21,15 +21,18 @@ use std::thread; use std::time::Duration; -use opendal::raw::{Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite}; -use opendal::Operator; +use opendal::raw::{ + Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, +}; use opendal::services::MemoryConfig; -use opendal::Result; -use rand::{thread_rng, Rng}; +use opendal::{Operator, Result}; +use rand::{Rng, thread_rng}; use tokio::time::sleep; pub(crate) fn benchmarking_config_build() -> Result { - Ok(Operator::from_config(MemoryConfig::default())?.layer(DelayLayer).finish()) + Ok(Operator::from_config(MemoryConfig::default())? + .layer(DelayLayer) + .finish()) } /// Usually takes around 50 ms, to visualize function: $ f\left(x\right)=x^{-0.5}\ \cdot\ 0.10 $ @@ -51,7 +54,7 @@ impl Layer for DelayLayer { #[derive(Debug)] struct DelayedAccessor { - inner: A + inner: A, } impl LayeredAccess for DelayedAccessor { @@ -75,11 +78,7 @@ impl LayeredAccess for DelayedAccessor { self.inner.read(path, args).await } - fn blocking_read( - &self, - path: &str, - args: OpRead, - ) -> Result<(RpRead, Self::BlockingReader)> { + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { thread::sleep(gen_amt_latency()); self.inner.blocking_read(path, args) @@ -89,11 +88,7 @@ impl LayeredAccess for DelayedAccessor { self.inner.write(path, args).await } - fn blocking_write( - &self, - path: &str, - args: OpWrite, - ) -> Result<(RpWrite, Self::BlockingWriter)> { + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { self.inner.blocking_write(path, args) } @@ -101,19 +96,15 @@ impl LayeredAccess for DelayedAccessor { self.inner.list(path, args).await } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, Self::BlockingLister)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { self.inner.blocking_list(path, args) } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { - self.inner.delete().await - } + self.inner.delete().await + } fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> { - self.inner.blocking_delete() - } + self.inner.blocking_delete() + } } From ceb9d15ed681b356d253f90f270ecf2a75a03048 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 19 Jun 2025 16:04:01 -0700 Subject: [PATCH 06/25] ref: clarify naming of benchmark --- crates/benchmarks/Cargo.toml | 2 +- crates/benchmarks/README.md | 6 +++++- ...-querying-taxicab.rs => sql-catalog-scanning-taxicab.rs} | 0 3 files changed, 6 insertions(+), 2 deletions(-) rename crates/benchmarks/benches/{sql-catalog-querying-taxicab.rs => sql-catalog-scanning-taxicab.rs} (100%) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index b65ae76e3..395809914 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -38,5 +38,5 @@ arrow-array.workspace = true criterion = { version = "0.6.0", features = ["async_tokio"] } [[bench]] -name = "sql-catalog-querying-taxicab" +name = "sql-catalog-scanning-taxicab" harness = false diff --git a/crates/benchmarks/README.md b/crates/benchmarks/README.md index 6f8c5d278..f331e6396 100644 --- a/crates/benchmarks/README.md +++ b/crates/benchmarks/README.md @@ -19,4 +19,8 @@ Where `iceberg-rust` benchmarks are kept. To run the benchmarks, we must pass the `benchmarking` rustflag so that the benchmarking storage is compiled and we can tamper with fileio latency. -This command includes it: `RUSTFLAGS="--cfg benchmarking" cargo bench` \ No newline at end of file +This command includes it: `RUSTFLAGS="--cfg benchmarking" cargo bench` + +## Existing benchmarks + +- `sql-catalog-scanning-taxicab`: Benchmarks scanning two columns from one month of NYC taxicab data. \ No newline at end of file diff --git a/crates/benchmarks/benches/sql-catalog-querying-taxicab.rs b/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs similarity index 100% rename from crates/benchmarks/benches/sql-catalog-querying-taxicab.rs rename to crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs From 1b3dab35486eb9f0ced3b1d30071bd67fdae6d5e Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 19 Jun 2025 16:15:36 -0700 Subject: [PATCH 07/25] fix: change confidence interval --- crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs b/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs index aacd9d066..702638cda 100644 --- a/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs +++ b/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs @@ -31,7 +31,8 @@ use tokio::runtime::Runtime; pub fn bench_1_taxicab_query(c: &mut Criterion) { let mut group = c.benchmark_group("single_read_taxicab_sql_catalog"); - group.measurement_time(Duration::from_secs(25)); + // request times are very sporadic, so we need a lower confidence level for reasonable results + group.measurement_time(Duration::from_secs(25)).confidence_level(0.8); let table_dir = run_construction_script("sql-catalog-taxicab"); let mut db_path = table_dir.clone(); From 56333bf38d1eaf830e26b7efe310b10a941f4397 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Fri, 20 Jun 2025 15:55:03 -0700 Subject: [PATCH 08/25] fix: include needed sqlx features for sqlite --- crates/benchmarks/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index 395809914..a8d4d7530 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [package] -name = "benches" +name = "benchmarks" edition.workspace = true homepage.workspace = true version.workspace = true @@ -33,6 +33,7 @@ tokio = { features = ["rt-multi-thread"], workspace = true } bytes = { workspace = true } futures = { workspace = true } arrow-array.workspace = true +sqlx = { version = "0.8.6", features = ["any", "runtime-tokio", "sqlite"] } [dev-dependencies] criterion = { version = "0.6.0", features = ["async_tokio"] } From 7bdcf443481b6e55f11bab4cfb02ee9ccea2f6ff Mon Sep 17 00:00:00 2001 From: Kyteware Date: Fri, 20 Jun 2025 15:55:26 -0700 Subject: [PATCH 09/25] chore: rename artifact of old benchmarks crate name --- crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs b/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs index 702638cda..89cd81f6e 100644 --- a/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs +++ b/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs @@ -20,7 +20,7 @@ use std::path::PathBuf; use std::time::{Duration, Instant}; use arrow_array::RecordBatch; -use benches::{copy_dir_to_fileio, run_construction_script}; +use benchmarks::{copy_dir_to_fileio, run_construction_script}; use criterion::{Criterion, criterion_group, criterion_main}; use futures::TryStreamExt; use iceberg::io::FileIOBuilder; From 62e5b5736da76345558d90535f618da31e6683d2 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 11:49:57 -0700 Subject: [PATCH 10/25] ref: rename the first benchmark to a more descriptive name --- crates/benchmarks/Cargo.toml | 2 +- crates/benchmarks/README.md | 2 +- ...icab.rs => sql-catalog-projection-once.rs} | 29 +++++++++++-------- 3 files changed, 19 insertions(+), 14 deletions(-) rename crates/benchmarks/benches/{sql-catalog-scanning-taxicab.rs => sql-catalog-projection-once.rs} (78%) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index a8d4d7530..390f81313 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -39,5 +39,5 @@ sqlx = { version = "0.8.6", features = ["any", "runtime-tokio", "sqlite"] } criterion = { version = "0.6.0", features = ["async_tokio"] } [[bench]] -name = "sql-catalog-scanning-taxicab" +name = "sql-catalog-projection-once" harness = false diff --git a/crates/benchmarks/README.md b/crates/benchmarks/README.md index f331e6396..588e0d37f 100644 --- a/crates/benchmarks/README.md +++ b/crates/benchmarks/README.md @@ -23,4 +23,4 @@ This command includes it: `RUSTFLAGS="--cfg benchmarking" cargo bench` ## Existing benchmarks -- `sql-catalog-scanning-taxicab`: Benchmarks scanning two columns from one month of NYC taxicab data. \ No newline at end of file +- `sql-catalog-projection-once`: Benchmarks scanning the NYC taxicab data and projecting two columns. diff --git a/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs b/crates/benchmarks/benches/sql-catalog-projection-once.rs similarity index 78% rename from crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs rename to crates/benchmarks/benches/sql-catalog-projection-once.rs index 89cd81f6e..589d121a2 100644 --- a/crates/benchmarks/benches/sql-catalog-scanning-taxicab.rs +++ b/crates/benchmarks/benches/sql-catalog-projection-once.rs @@ -29,10 +29,10 @@ use iceberg::{Catalog, TableIdent}; use iceberg_catalog_sql::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; use tokio::runtime::Runtime; -pub fn bench_1_taxicab_query(c: &mut Criterion) { - let mut group = c.benchmark_group("single_read_taxicab_sql_catalog"); +pub fn bench_sql_catalog_projection_once(c: &mut Criterion) { + let mut group = c.benchmark_group("sql_catalog_projection_once"); // request times are very sporadic, so we need a lower confidence level for reasonable results - group.measurement_time(Duration::from_secs(25)).confidence_level(0.8); + group.measurement_time(Duration::from_secs(20)).sample_size(50); let table_dir = run_construction_script("sql-catalog-taxicab"); let mut db_path = table_dir.clone(); @@ -40,19 +40,24 @@ pub fn bench_1_taxicab_query(c: &mut Criterion) { let uri = format!("sqlite:{}", db_path.to_str().unwrap()); group.bench_function( - "single_read_taxicab_sql_catalog", + "sql_catalog_projection_once", // iter custom is not ideal, but criterion doesn't let us have a custom async setup which is really annoying |b| { - b.to_async(Runtime::new().unwrap()).iter_custom(async |_| { - let table = setup_table(table_dir.clone(), uri.clone()).await; + b.to_async(Runtime::new().unwrap()).iter_custom(async |n| { + let mut total_elapsed = Duration::default(); - let start = Instant::now(); - let output = scan_table(black_box(table)).await; - let end = Instant::now(); + for _ in 0..n { + let table = setup_table(table_dir.clone(), uri.clone()).await; - drop(black_box(output)); + let start = Instant::now(); + let output = scan_table(black_box(table)).await; + let dur = start.elapsed(); - end - start + drop(black_box(output)); + total_elapsed += dur; + } + + total_elapsed }) }, ); @@ -93,5 +98,5 @@ async fn scan_table(table: Table) -> Vec { stream.try_collect().await.unwrap() } -criterion_group!(benches, bench_1_taxicab_query); +criterion_group!(benches, bench_sql_catalog_projection_once); criterion_main!(benches); From d0f40a6217f2270f6cc00d5bf728ac6232a781c4 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 11:50:58 -0700 Subject: [PATCH 11/25] feat: make latency static and use it for writes as well --- crates/iceberg/src/io/storage_benchmarking.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/io/storage_benchmarking.rs b/crates/iceberg/src/io/storage_benchmarking.rs index 2dedc254e..045fd7386 100644 --- a/crates/iceberg/src/io/storage_benchmarking.rs +++ b/crates/iceberg/src/io/storage_benchmarking.rs @@ -26,8 +26,8 @@ use opendal::raw::{ }; use opendal::services::MemoryConfig; use opendal::{Operator, Result}; -use rand::{Rng, thread_rng}; use tokio::time::sleep; +use rand::*; pub(crate) fn benchmarking_config_build() -> Result { Ok(Operator::from_config(MemoryConfig::default())? @@ -35,12 +35,6 @@ pub(crate) fn benchmarking_config_build() -> Result { .finish()) } -/// Usually takes around 50 ms, to visualize function: $ f\left(x\right)=x^{-0.5}\ \cdot\ 0.10 $ -fn gen_amt_latency() -> Duration { - let x: f64 = thread_rng().gen_range(0.01..10.); - Duration::from_secs_f64(x.powf(-0.5) * 0.1) -} - /// A layer that artifially introduces predictable relay for better benchmarking struct DelayLayer; @@ -73,22 +67,26 @@ impl LayeredAccess for DelayedAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - sleep(gen_amt_latency()).await; + sleep(Duration::from_millis(20)).await; self.inner.read(path, args).await } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - thread::sleep(gen_amt_latency()); + thread::sleep(Duration::from_millis(20)); self.inner.blocking_read(path, args) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + sleep(Duration::from_millis(20)).await; + self.inner.write(path, args).await } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + thread::sleep(Duration::from_millis(20)); + self.inner.blocking_write(path, args) } From ad63b8905b4a635891939b1b9f2ac30a7061cd4c Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 15:47:53 -0700 Subject: [PATCH 12/25] doc: document reason for python script --- .../construction_scripts/sql-catalog-taxicab.py | 2 ++ crates/benchmarks/src/lib.rs | 15 +++++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py b/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py index 2d9499bcd..08850c23f 100644 --- a/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py +++ b/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +# reason for existence explained in ../lib.rs + from pathlib import Path from sys import argv from urllib.request import urlretrieve diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs index e1a1c8a8f..f0c431ea7 100644 --- a/crates/benchmarks/src/lib.rs +++ b/crates/benchmarks/src/lib.rs @@ -26,11 +26,14 @@ use iceberg::io::FileIO; use rand::{RngCore, thread_rng}; use walkdir::WalkDir; -/// runs the python construction script, provided just the file name without .py -/// -/// ("sql-catalog-querying-taxicab" etc.) -pub fn run_construction_script(script_name: &str) -> PathBuf { - let script_filename = [script_name, "py"].join("."); +/// As of now, write support for tables is not complete for iceberg rust, so we can't construct +/// tables for benchmarking ourselves. +/// +/// As a temporary (and admittedly ugly) measure, we run a python script to generate the tables +/// in a temp directory instead and then migrate all files created to a memory-storage `FileIO`. +/// When write support is complete, we can easily swap out the Python scripts and do this natively. +pub fn construct_table(table_name: &str) -> PathBuf { + let script_filename = [table_name, "py"].join("."); let mut script_path = PathBuf::from_str(env!("CARGO_MANIFEST_DIR")).unwrap(); script_path.push("src"); script_path.push("construction_scripts"); @@ -42,7 +45,7 @@ pub fn run_construction_script(script_name: &str) -> PathBuf { // should look like /tmp/iceberg_benchmark_tables/-> let mut working_dir = temp_dir(); working_dir.push("iceberg_benchmark_tables"); - working_dir.push([script_name, &thread_rng().next_u64().to_string()].join("-")); + working_dir.push([table_name, &thread_rng().next_u64().to_string()].join("-")); create_dir_all(&working_dir).unwrap(); From 97df5415b11e3abc221f9b28dd8bc2e34c07df98 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 15:48:17 -0700 Subject: [PATCH 13/25] ref: move dep documentation to a req.txt file --- .../src/construction_scripts/README.md | 24 ------------------- .../src/construction_scripts/requirements.txt | 3 +++ 2 files changed, 3 insertions(+), 24 deletions(-) delete mode 100644 crates/benchmarks/src/construction_scripts/README.md create mode 100644 crates/benchmarks/src/construction_scripts/requirements.txt diff --git a/crates/benchmarks/src/construction_scripts/README.md b/crates/benchmarks/src/construction_scripts/README.md deleted file mode 100644 index 9d3b7ab2d..000000000 --- a/crates/benchmarks/src/construction_scripts/README.md +++ /dev/null @@ -1,24 +0,0 @@ - - -This is a collection of python scripts for generating iceberg catalogs and tables. - -When transaction support is done and we can do this ourselves, these should be migrated to iceberg-rs. - -python dependancies: -pyiceberg[sql-sqlite] -pyarrow \ No newline at end of file diff --git a/crates/benchmarks/src/construction_scripts/requirements.txt b/crates/benchmarks/src/construction_scripts/requirements.txt new file mode 100644 index 000000000..ffd4f3b0b --- /dev/null +++ b/crates/benchmarks/src/construction_scripts/requirements.txt @@ -0,0 +1,3 @@ +python dependancies: +pyiceberg[sql-sqlite] +pyarrow \ No newline at end of file From 91a1d0a179fb4c9f6834995f58a5ece5d85fd37c Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 15:56:50 -0700 Subject: [PATCH 14/25] doc: clarify sqlite usage --- .../benchmarks/src/construction_scripts/sql-catalog-taxicab.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py b/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py index 08850c23f..c03900d64 100644 --- a/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py +++ b/crates/benchmarks/src/construction_scripts/sql-catalog-taxicab.py @@ -51,6 +51,8 @@ table.append(df) + +# the rust sql catalog needs an extra column for each table that pyiceberg doesn't include, this adds it conn = sqlite3.connect(f"{warehouse_path}/benchmarking-catalog.db") c = conn.cursor() c.execute("ALTER TABLE iceberg_tables ADD column iceberg_type VARCHAR(5) DEFAULT 'TABLE'") From 3335921bd0e12c94996d6874b37e033ad051925c Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 15:57:00 -0700 Subject: [PATCH 15/25] chore: add newline to gitignore --- crates/benchmarks/.gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/benchmarks/.gitignore b/crates/benchmarks/.gitignore index 289e5649b..88d0f7e05 100644 --- a/crates/benchmarks/.gitignore +++ b/crates/benchmarks/.gitignore @@ -15,4 +15,4 @@ # specific language governing permissions and limitations # under the License. -src/construction_scripts/__pycache__/ \ No newline at end of file +src/construction_scripts/__pycache__/ From 34d2ad2c6ed47972c5c42b32c8add18647c9aabd Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 15:57:15 -0700 Subject: [PATCH 16/25] doc: clarify benchmarking readme --- crates/benchmarks/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/benchmarks/README.md b/crates/benchmarks/README.md index 588e0d37f..624ad115b 100644 --- a/crates/benchmarks/README.md +++ b/crates/benchmarks/README.md @@ -17,10 +17,10 @@ under the License. --> Where `iceberg-rust` benchmarks are kept. -To run the benchmarks, we must pass the `benchmarking` rustflag so that the benchmarking storage is compiled and we can tamper with fileio latency. +To run the benchmarks, we must pass the `benchmarking` rustflag so that the benchmarking storage is compiled and we can use a storage with a predictable and reliable latency. This command includes it: `RUSTFLAGS="--cfg benchmarking" cargo bench` ## Existing benchmarks -- `sql-catalog-projection-once`: Benchmarks scanning the NYC taxicab data and projecting two columns. +- `sql-catalog-projection-once`: Benchmarks scanning a table (we use a month of NYC taxicab data) and projecting two columns. From fa1273fc27bed7ab24cd142273a5da67a7379c88 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 15:57:29 -0700 Subject: [PATCH 17/25] doc: clarify iter_custom usage --- crates/benchmarks/benches/sql-catalog-projection-once.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/benchmarks/benches/sql-catalog-projection-once.rs b/crates/benchmarks/benches/sql-catalog-projection-once.rs index 589d121a2..793582474 100644 --- a/crates/benchmarks/benches/sql-catalog-projection-once.rs +++ b/crates/benchmarks/benches/sql-catalog-projection-once.rs @@ -20,7 +20,7 @@ use std::path::PathBuf; use std::time::{Duration, Instant}; use arrow_array::RecordBatch; -use benchmarks::{copy_dir_to_fileio, run_construction_script}; +use benchmarks::{copy_dir_to_fileio, construct_table}; use criterion::{Criterion, criterion_group, criterion_main}; use futures::TryStreamExt; use iceberg::io::FileIOBuilder; @@ -31,17 +31,18 @@ use tokio::runtime::Runtime; pub fn bench_sql_catalog_projection_once(c: &mut Criterion) { let mut group = c.benchmark_group("sql_catalog_projection_once"); - // request times are very sporadic, so we need a lower confidence level for reasonable results group.measurement_time(Duration::from_secs(20)).sample_size(50); - let table_dir = run_construction_script("sql-catalog-taxicab"); + let table_dir = construct_table("sql-catalog-taxicab"); let mut db_path = table_dir.clone(); db_path.push("benchmarking-catalog.db"); let uri = format!("sqlite:{}", db_path.to_str().unwrap()); group.bench_function( "sql_catalog_projection_once", - // iter custom is not ideal, but criterion doesn't let us have a custom async setup which is really annoying + // an issue with criterion (https://github.com/bheisler/criterion.rs/issues/751) means we can't do a normal benchmark here, + // it doesn't let use provide an async setup function to build `FileIO`, etc. instead, we have to use `iter_custom` and do + // the measurements ourselves |b| { b.to_async(Runtime::new().unwrap()).iter_custom(async |n| { let mut total_elapsed = Duration::default(); From 4c730018d35e0491eccb44efb12f85c108a17246 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 16:03:58 -0700 Subject: [PATCH 18/25] doc: include reasoning for benchmarking storage --- crates/iceberg/src/io/storage_benchmarking.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/io/storage_benchmarking.rs b/crates/iceberg/src/io/storage_benchmarking.rs index 045fd7386..0b3dfe829 100644 --- a/crates/iceberg/src/io/storage_benchmarking.rs +++ b/crates/iceberg/src/io/storage_benchmarking.rs @@ -15,8 +15,15 @@ // specific language governing permissions and limitations // under the License. -//! This storage is used to mimick consistent latency for benchmarks of iceberg. -//! It should not be included in standard distributions of iceberg. +//! It's challenging to make storage with reproducible delays for benchmarks. Variance in performance can +//! tamper with results. Additionally, benchmarks can benefit from granular control of exactly how long +//! the latency will be. +//! +//! To solve this problem, we have an extra **usually not included** storage type for benchmarks. This is +//! almost exactly the same as the memory catalog, except with a preset latency with each read and write. +//! +//! THIS CODE SHOULD NOT AND IS NOT INCLUDED IN REGULAR DISTRIBUTIONS OF `iceberg-rs`. To include it, +//! you must pass RUSTFLAGS="--cfg benchmarking" to `cargo`. use std::thread; use std::time::Duration; From f8a1e7969eccabfb829ac1eab5e5a9a9534ab048 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 16:28:22 -0700 Subject: [PATCH 19/25] fix: lower verison of sqlx in benchmarking for lockfile --- crates/benchmarks/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index 390f81313..773a1ff53 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -33,7 +33,7 @@ tokio = { features = ["rt-multi-thread"], workspace = true } bytes = { workspace = true } futures = { workspace = true } arrow-array.workspace = true -sqlx = { version = "0.8.6", features = ["any", "runtime-tokio", "sqlite"] } +sqlx = { version = "0.8.1", features = ["any", "runtime-tokio", "sqlite"] } [dev-dependencies] criterion = { version = "0.6.0", features = ["async_tokio"] } From f76809baa701d3a3d9c309af66f2ee582d556994 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 16:29:28 -0700 Subject: [PATCH 20/25] chore: add license to requirements.txt --- .../src/construction_scripts/requirements.txt | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/crates/benchmarks/src/construction_scripts/requirements.txt b/crates/benchmarks/src/construction_scripts/requirements.txt index ffd4f3b0b..07227ec69 100644 --- a/crates/benchmarks/src/construction_scripts/requirements.txt +++ b/crates/benchmarks/src/construction_scripts/requirements.txt @@ -1,3 +1,19 @@ -python dependancies: +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + pyiceberg[sql-sqlite] -pyarrow \ No newline at end of file +pyarrow From 5f8cb971a0c5da48b784e3125dbae759a89e3cc2 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 16:38:29 -0700 Subject: [PATCH 21/25] fix: dont run benchmark if benchmarking flag isn't enabled --- .../benches/sql-catalog-projection-once.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/crates/benchmarks/benches/sql-catalog-projection-once.rs b/crates/benchmarks/benches/sql-catalog-projection-once.rs index 793582474..0914b3329 100644 --- a/crates/benchmarks/benches/sql-catalog-projection-once.rs +++ b/crates/benchmarks/benches/sql-catalog-projection-once.rs @@ -20,7 +20,7 @@ use std::path::PathBuf; use std::time::{Duration, Instant}; use arrow_array::RecordBatch; -use benchmarks::{copy_dir_to_fileio, construct_table}; +use benchmarks::{construct_table, copy_dir_to_fileio}; use criterion::{Criterion, criterion_group, criterion_main}; use futures::TryStreamExt; use iceberg::io::FileIOBuilder; @@ -30,8 +30,17 @@ use iceberg_catalog_sql::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; use tokio::runtime::Runtime; pub fn bench_sql_catalog_projection_once(c: &mut Criterion) { + #[cfg(not(benchmarking))] + { + eprintln!(r#"benchmarking flag not enabled, must pass RUSTFLAGS="--cfg benchmarking""#); + return; + } + + #[allow(unreachable_code)] let mut group = c.benchmark_group("sql_catalog_projection_once"); - group.measurement_time(Duration::from_secs(20)).sample_size(50); + group + .measurement_time(Duration::from_secs(20)) + .sample_size(50); let table_dir = construct_table("sql-catalog-taxicab"); let mut db_path = table_dir.clone(); From 9a52a9ece3e54cd66e2cce0c7456a76454f74150 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 16:38:46 -0700 Subject: [PATCH 22/25] chore: turn off warning for benchmarking cfg --- crates/benchmarks/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index 773a1ff53..fb1e1aaa1 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -41,3 +41,7 @@ criterion = { version = "0.6.0", features = ["async_tokio"] } [[bench]] name = "sql-catalog-projection-once" harness = false + +[lints.rust] +# so the #[cfg(benchmarking)] tag doesn't create warnings +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(benchmarking)'] } From 9f12ba92a7055a14c8a4418b24d6d13878493a7b Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 16:38:54 -0700 Subject: [PATCH 23/25] chore: cargo fmt --- crates/benchmarks/src/lib.rs | 2 +- crates/iceberg/src/io/storage_benchmarking.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs index f0c431ea7..a6f332ad2 100644 --- a/crates/benchmarks/src/lib.rs +++ b/crates/benchmarks/src/lib.rs @@ -28,7 +28,7 @@ use walkdir::WalkDir; /// As of now, write support for tables is not complete for iceberg rust, so we can't construct /// tables for benchmarking ourselves. -/// +/// /// As a temporary (and admittedly ugly) measure, we run a python script to generate the tables /// in a temp directory instead and then migrate all files created to a memory-storage `FileIO`. /// When write support is complete, we can easily swap out the Python scripts and do this natively. diff --git a/crates/iceberg/src/io/storage_benchmarking.rs b/crates/iceberg/src/io/storage_benchmarking.rs index 0b3dfe829..cfa2e6705 100644 --- a/crates/iceberg/src/io/storage_benchmarking.rs +++ b/crates/iceberg/src/io/storage_benchmarking.rs @@ -18,10 +18,10 @@ //! It's challenging to make storage with reproducible delays for benchmarks. Variance in performance can //! tamper with results. Additionally, benchmarks can benefit from granular control of exactly how long //! the latency will be. -//! +//! //! To solve this problem, we have an extra **usually not included** storage type for benchmarks. This is //! almost exactly the same as the memory catalog, except with a preset latency with each read and write. -//! +//! //! THIS CODE SHOULD NOT AND IS NOT INCLUDED IN REGULAR DISTRIBUTIONS OF `iceberg-rs`. To include it, //! you must pass RUSTFLAGS="--cfg benchmarking" to `cargo`. @@ -33,8 +33,8 @@ use opendal::raw::{ }; use opendal::services::MemoryConfig; use opendal::{Operator, Result}; -use tokio::time::sleep; use rand::*; +use tokio::time::sleep; pub(crate) fn benchmarking_config_build() -> Result { Ok(Operator::from_config(MemoryConfig::default())? From 3cab0467e456475f75e74558c8cf5e80af10fda8 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sun, 22 Jun 2025 18:03:26 -0700 Subject: [PATCH 24/25] chore: changes to pass cargo clippy --- crates/benchmarks/benches/sql-catalog-projection-once.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/benchmarks/benches/sql-catalog-projection-once.rs b/crates/benchmarks/benches/sql-catalog-projection-once.rs index 0914b3329..8ee3c0c71 100644 --- a/crates/benchmarks/benches/sql-catalog-projection-once.rs +++ b/crates/benchmarks/benches/sql-catalog-projection-once.rs @@ -29,6 +29,7 @@ use iceberg::{Catalog, TableIdent}; use iceberg_catalog_sql::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; use tokio::runtime::Runtime; +#[allow(unused_variables)] pub fn bench_sql_catalog_projection_once(c: &mut Criterion) { #[cfg(not(benchmarking))] { @@ -88,11 +89,10 @@ async fn setup_table(table_dir: PathBuf, uri: String) -> Table { .warehouse_location(table_dir.to_str().unwrap().to_owned()) .build(); let catalog = SqlCatalog::new(config).await.unwrap(); - let table = catalog + catalog .load_table(&TableIdent::from_strs(["default", "taxi_dataset"]).unwrap()) .await - .expect(&format!("table_dir: {table_dir:?}")); - table + .unwrap() } async fn scan_table(table: Table) -> Vec { From a529afddefd05d14af5b8c65a31c19cae4e12710 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Mon, 23 Jun 2025 20:02:11 -0700 Subject: [PATCH 25/25] chore: ignore sqlx in cargo machete because we only use it to bumb the features up --- crates/benchmarks/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index fb1e1aaa1..2702e7246 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -45,3 +45,7 @@ harness = false [lints.rust] # so the #[cfg(benchmarking)] tag doesn't create warnings unexpected_cfgs = { level = "warn", check-cfg = ['cfg(benchmarking)'] } + +# this is done so we can get sqlite feature support without a warning +[package.metadata.cargo-machete] +ignored = ["sqlx"]