From 99d237a9f92d61fd28a964a0243c21197737154a Mon Sep 17 00:00:00 2001 From: SeoYoung Lee Date: Mon, 23 Jun 2025 11:21:10 +0000 Subject: [PATCH 1/2] Add microbenchmark for spilling with compression --- datafusion/physical-plan/benches/spill_io.rs | 305 ++++++++++++++++++- 1 file changed, 302 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/benches/spill_io.rs b/datafusion/physical-plan/benches/spill_io.rs index 3b877671ad58..5b75e65cf9a8 100644 --- a/datafusion/physical-plan/benches/spill_io.rs +++ b/datafusion/physical-plan/benches/spill_io.rs @@ -16,14 +16,21 @@ // under the License. use arrow::array::{ - Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, StringBuilder, + Date32Builder, Decimal128Builder, Int32Builder, Int64Builder, RecordBatch, + StringBuilder, }; use arrow::datatypes::{DataType, Field, Schema}; -use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; +use criterion::measurement::WallTime; +use criterion::{ + criterion_group, criterion_main, BatchSize, BenchmarkGroup, BenchmarkId, Criterion, +}; +use datafusion_common::config::SpillCompression; +use datafusion_execution::memory_pool::human_readable_size; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_plan::common::collect; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics}; use datafusion_physical_plan::SpillManager; +use rand::{Rng, SeedableRng}; use std::sync::Arc; use tokio::runtime::Runtime; @@ -119,5 +126,297 @@ fn bench_spill_io(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, bench_spill_io); +// Generate 50 RecordBatches mimicking TPC-H Q2's partial aggregate result: +// GROUP BY ps_partkey -> MIN(ps_supplycost) +fn create_q2_like_batches() -> (Arc, Vec) { + // use fixed seed + let seed = 2; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(50); + + let mut current_key = 400000_i64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("ps_partkey", DataType::Int64, false), + Field::new("min_ps_supplycost", DataType::Decimal128(15, 2), true), + ])); + + for _ in 0..50 { + let mut partkey_builder = Int64Builder::new(); + let mut cost_builder = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + + for _ in 0..8192 { + // Occasionally skip a few partkey values to simulate sparsity + let jump = if rng.random_bool(0.05) { + rng.random_range(2..10) + } else { + 1 + }; + current_key += jump; + + let supply_cost = rng.random_range(10_00..100_000) as i128; + + partkey_builder.append_value(current_key); + cost_builder.append_value(supply_cost); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(partkey_builder.finish()), + Arc::new(cost_builder.finish()), + ], + ) + .unwrap(); + + batches.push(batch); + } + + (schema, batches) +} + +/// Generate 50 RecordBatches mimicking TPC-H Q16's partial aggregate result: +/// GROUP BY (p_brand, p_type, p_size) -> COUNT(DISTINCT ps_suppkey) +pub fn create_q16_like_batches() -> (Arc, Vec) { + let seed = 16; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(50); + + let schema = Arc::new(Schema::new(vec![ + Field::new("p_brand", DataType::Utf8, false), + Field::new("p_type", DataType::Utf8, false), + Field::new("p_size", DataType::Int32, false), + Field::new("alias1", DataType::Int64, false), // COUNT(DISTINCT ps_suppkey) + ])); + + // Representative string pools + let brands = ["Brand#32", "Brand#33", "Brand#41", "Brand#42", "Brand#55"]; + let types = [ + "PROMO ANODIZED NICKEL", + "STANDARD BRUSHED NICKEL", + "PROMO POLISHED COPPER", + "ECONOMY ANODIZED BRASS", + "LARGE BURNISHED COPPER", + "STANDARD POLISHED TIN", + "SMALL PLATED STEEL", + "MEDIUM POLISHED COPPER", + ]; + let sizes = [3, 9, 14, 19, 23, 36, 45, 49]; + + for _ in 0..50 { + let mut brand_builder = StringBuilder::new(); + let mut type_builder = StringBuilder::new(); + let mut size_builder = Int32Builder::new(); + let mut count_builder = Int64Builder::new(); + + for _ in 0..8192 { + let brand = brands[rng.random_range(0..brands.len())]; + let ptype = types[rng.random_range(0..types.len())]; + let size = sizes[rng.random_range(0..sizes.len())]; + let count = rng.random_range(1000..100_000); + + brand_builder.append_value(brand); + type_builder.append_value(ptype); + size_builder.append_value(size); + count_builder.append_value(count); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(brand_builder.finish()), + Arc::new(type_builder.finish()), + Arc::new(size_builder.finish()), + Arc::new(count_builder.finish()), + ], + ) + .unwrap(); + + batches.push(batch); + } + + (schema, batches) +} + +// Generate 50 RecordBatches mimicking TPC-H Q20's partial aggregate result: +// GROUP BY (l_partkey, l_suppkey) -> SUM(l_quantity) +fn create_q20_like_batches() -> (Arc, Vec) { + let seed = 20; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(50); + + let mut current_partkey = 400000_i64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("l_partkey", DataType::Int64, false), + Field::new("l_suppkey", DataType::Int64, false), + Field::new("sum_l_quantity", DataType::Decimal128(25, 2), true), + ])); + + for _ in 0..50 { + let mut partkey_builder = Int64Builder::new(); + let mut suppkey_builder = Int64Builder::new(); + let mut quantity_builder = Decimal128Builder::new() + .with_precision_and_scale(25, 2) + .unwrap(); + + for _ in 0..8192 { + // Occasionally skip a few partkey values to simulate sparsity + let partkey_jump = if rng.random_bool(0.03) { + rng.random_range(2..6) + } else { + 1 + }; + current_partkey += partkey_jump; + + let suppkey = rng.random_range(10_000..99_999); + let quantity = rng.random_range(500..20_000) as i128; + + partkey_builder.append_value(current_partkey); + suppkey_builder.append_value(suppkey); + quantity_builder.append_value(quantity); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(partkey_builder.finish()), + Arc::new(suppkey_builder.finish()), + Arc::new(quantity_builder.finish()), + ], + ) + .unwrap(); + + batches.push(batch); + } + + (schema, batches) +} + +// Benchmarks spill write + read performance across multiple compression codecs +// using realistic input data inspired by TPC-H aggregate spill scenarios. +// +// This function prepares synthetic RecordBatches that mimic the schema and distribution +// of intermediate aggregate results from representative TPC-H queries (Q2, Q16, Q20). +// For each dataset: +// - It evaluates spill performance under different compression codecs (e.g., Uncompressed, Zstd, LZ4). +// - It measures end-to-end spill write + read performance using Criterion. +// - It prints the observed memory-to-disk compression ratio for each codec. +// +// This helps evaluate the tradeoffs between compression ratio and runtime overhead for various codecs. +fn bench_spill_compression(c: &mut Criterion) { + let env = Arc::new(RuntimeEnv::default()); + let mut group = c.benchmark_group("spill_compression"); + let rt = Runtime::new().unwrap(); + let compressions = vec![ + SpillCompression::Uncompressed, + SpillCompression::Zstd, + SpillCompression::Lz4Frame, + ]; + + // Q2 + let (schema, batches) = create_q2_like_batches(); + benchmark_spill_batches_for_all_codec( + &mut group, + "q2", + batches, + &compressions, + &rt, + env.clone(), + schema, + ); + // Q16 + let (schema, batches) = create_q16_like_batches(); + benchmark_spill_batches_for_all_codec( + &mut group, + "q16", + batches, + &compressions, + &rt, + env.clone(), + schema, + ); + // Q20 + let (schema, batches) = create_q20_like_batches(); + benchmark_spill_batches_for_all_codec( + &mut group, + "q20", + batches, + &compressions, + &rt, + env, + schema, + ); + + group.finish(); +} + +fn benchmark_spill_batches_for_all_codec( + group: &mut BenchmarkGroup<'_, WallTime>, + batch_label: &str, + batches: Vec, + compressions: &[SpillCompression], + rt: &Runtime, + env: Arc, + schema: Arc, +) { + let mem_bytes: usize = batches.iter().map(|b| b.get_array_memory_size()).sum(); + + for &compression in compressions { + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let spill_manager = + SpillManager::new(Arc::clone(&env), metrics.clone(), Arc::clone(&schema)) + .with_compression_type(compression); + + let bench_id = BenchmarkId::new(batch_label, compression.to_string()); + group.bench_with_input(bench_id, &spill_manager, |b, spill_manager| { + b.iter_batched( + || batches.clone(), + |batches| { + rt.block_on(async { + let spill_file = spill_manager + .spill_record_batch_and_finish( + &batches, + &format!("{batch_label}_{compression}"), + ) + .unwrap() + .unwrap(); + let stream = + spill_manager.read_spill_as_stream(spill_file).unwrap(); + let _ = collect(stream).await.unwrap(); + }) + }, + BatchSize::LargeInput, + ) + }); + + // Run Spilling Read & Write once more to read file size + let spill_file = spill_manager + .spill_record_batch_and_finish( + &batches, + &format!("{batch_label}_{compression}"), + ) + .unwrap() + .unwrap(); + + let disk_bytes = std::fs::metadata(spill_file.path()) + .expect("metadata read fail") + .len() as usize; + + let ratio = mem_bytes as f64 / disk_bytes.max(1) as f64; + + println!( + "[{} | {:?}] mem: {}| disk: {}| compression ratio: {:.3}x", + batch_label, + compression, + human_readable_size(mem_bytes), + human_readable_size(disk_bytes), + ratio + ); + } +} + +criterion_group!(benches, bench_spill_io, bench_spill_compression); criterion_main!(benches); From 68f6b7a66e50a727f4e1a595d0b7e4ee04f812f4 Mon Sep 17 00:00:00 2001 From: SeoYoung Lee Date: Tue, 24 Jun 2025 04:20:25 +0000 Subject: [PATCH 2/2] add wide batch --- datafusion/physical-plan/benches/spill_io.rs | 157 ++++++++++++++++--- 1 file changed, 137 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-plan/benches/spill_io.rs b/datafusion/physical-plan/benches/spill_io.rs index 5b75e65cf9a8..e72fdfb71dc2 100644 --- a/datafusion/physical-plan/benches/spill_io.rs +++ b/datafusion/physical-plan/benches/spill_io.rs @@ -126,13 +126,13 @@ fn bench_spill_io(c: &mut Criterion) { group.finish(); } -// Generate 50 RecordBatches mimicking TPC-H Q2's partial aggregate result: +// Generate `num_batches` RecordBatches mimicking TPC-H Q2's partial aggregate result: // GROUP BY ps_partkey -> MIN(ps_supplycost) -fn create_q2_like_batches() -> (Arc, Vec) { +fn create_q2_like_batches(num_batches: usize) -> (Arc, Vec) { // use fixed seed let seed = 2; let mut rng = rand::rngs::StdRng::seed_from_u64(seed); - let mut batches = Vec::with_capacity(50); + let mut batches = Vec::with_capacity(num_batches); let mut current_key = 400000_i64; @@ -141,7 +141,7 @@ fn create_q2_like_batches() -> (Arc, Vec) { Field::new("min_ps_supplycost", DataType::Decimal128(15, 2), true), ])); - for _ in 0..50 { + for _ in 0..num_batches { let mut partkey_builder = Int64Builder::new(); let mut cost_builder = Decimal128Builder::new() .with_precision_and_scale(15, 2) @@ -177,12 +177,12 @@ fn create_q2_like_batches() -> (Arc, Vec) { (schema, batches) } -/// Generate 50 RecordBatches mimicking TPC-H Q16's partial aggregate result: +/// Generate `num_batches` RecordBatches mimicking TPC-H Q16's partial aggregate result: /// GROUP BY (p_brand, p_type, p_size) -> COUNT(DISTINCT ps_suppkey) -pub fn create_q16_like_batches() -> (Arc, Vec) { +pub fn create_q16_like_batches(num_batches: usize) -> (Arc, Vec) { let seed = 16; let mut rng = rand::rngs::StdRng::seed_from_u64(seed); - let mut batches = Vec::with_capacity(50); + let mut batches = Vec::with_capacity(num_batches); let schema = Arc::new(Schema::new(vec![ Field::new("p_brand", DataType::Utf8, false), @@ -205,7 +205,7 @@ pub fn create_q16_like_batches() -> (Arc, Vec) { ]; let sizes = [3, 9, 14, 19, 23, 36, 45, 49]; - for _ in 0..50 { + for _ in 0..num_batches { let mut brand_builder = StringBuilder::new(); let mut type_builder = StringBuilder::new(); let mut size_builder = Int32Builder::new(); @@ -240,12 +240,12 @@ pub fn create_q16_like_batches() -> (Arc, Vec) { (schema, batches) } -// Generate 50 RecordBatches mimicking TPC-H Q20's partial aggregate result: +// Generate `num_batches` RecordBatches mimicking TPC-H Q20's partial aggregate result: // GROUP BY (l_partkey, l_suppkey) -> SUM(l_quantity) -fn create_q20_like_batches() -> (Arc, Vec) { +fn create_q20_like_batches(num_batches: usize) -> (Arc, Vec) { let seed = 20; let mut rng = rand::rngs::StdRng::seed_from_u64(seed); - let mut batches = Vec::with_capacity(50); + let mut batches = Vec::with_capacity(num_batches); let mut current_partkey = 400000_i64; @@ -255,7 +255,7 @@ fn create_q20_like_batches() -> (Arc, Vec) { Field::new("sum_l_quantity", DataType::Decimal128(25, 2), true), ])); - for _ in 0..50 { + for _ in 0..num_batches { let mut partkey_builder = Int64Builder::new(); let mut suppkey_builder = Int64Builder::new(); let mut quantity_builder = Decimal128Builder::new() @@ -295,11 +295,115 @@ fn create_q20_like_batches() -> (Arc, Vec) { (schema, batches) } +/// Genereate `num_batches` wide RecordBatches resembling sort-tpch Q10 for benchmarking. +/// This includes multiple numeric, date, and Utf8View columns (15 total). +pub fn create_wide_batches(num_batches: usize) -> (Arc, Vec) { + let seed = 10; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let schema = Arc::new(Schema::new(vec![ + Field::new("l_linenumber", DataType::Int32, false), + Field::new("l_suppkey", DataType::Int64, false), + Field::new("l_orderkey", DataType::Int64, false), + Field::new("l_partkey", DataType::Int64, false), + Field::new("l_quantity", DataType::Decimal128(15, 2), false), + Field::new("l_extendedprice", DataType::Decimal128(15, 2), false), + Field::new("l_discount", DataType::Decimal128(15, 2), false), + Field::new("l_tax", DataType::Decimal128(15, 2), false), + Field::new("l_returnflag", DataType::Utf8, false), + Field::new("l_linestatus", DataType::Utf8, false), + Field::new("l_shipdate", DataType::Date32, false), + Field::new("l_commitdate", DataType::Date32, false), + Field::new("l_receiptdate", DataType::Date32, false), + Field::new("l_shipinstruct", DataType::Utf8, false), + Field::new("l_shipmode", DataType::Utf8, false), + ])); + + for _ in 0..num_batches { + let mut linenum = Int32Builder::new(); + let mut suppkey = Int64Builder::new(); + let mut orderkey = Int64Builder::new(); + let mut partkey = Int64Builder::new(); + let mut quantity = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut extprice = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut discount = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut tax = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut retflag = StringBuilder::new(); + let mut linestatus = StringBuilder::new(); + let mut shipdate = Date32Builder::new(); + let mut commitdate = Date32Builder::new(); + let mut receiptdate = Date32Builder::new(); + let mut shipinstruct = StringBuilder::new(); + let mut shipmode = StringBuilder::new(); + + let return_flags = ["A", "N", "R"]; + let statuses = ["F", "O"]; + let instructs = ["DELIVER IN PERSON", "COLLECT COD", "NONE"]; + let modes = ["TRUCK", "MAIL", "SHIP", "RAIL", "AIR"]; + + for i in 0..8192 { + linenum.append_value(i % 7); + suppkey.append_value(rng.random_range(0..100_000)); + orderkey.append_value(1_000_000 + i as i64); + partkey.append_value(rng.random_range(0..200_000)); + + quantity.append_value(rng.random_range(100..10000) as i128); + extprice.append_value(rng.random_range(1_000..1_000_000) as i128); + discount.append_value(rng.random_range(0..10000) as i128); + tax.append_value(rng.random_range(0..5000) as i128); + + retflag.append_value(return_flags[rng.random_range(0..return_flags.len())]); + linestatus.append_value(statuses[rng.random_range(0..statuses.len())]); + + let base_date = 10_000; + shipdate.append_value(base_date + (i % 1000)); + commitdate.append_value(base_date + (i % 1000) + 1); + receiptdate.append_value(base_date + (i % 1000) + 2); + + shipinstruct.append_value(instructs[rng.random_range(0..instructs.len())]); + shipmode.append_value(modes[rng.random_range(0..modes.len())]); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(linenum.finish()), + Arc::new(suppkey.finish()), + Arc::new(orderkey.finish()), + Arc::new(partkey.finish()), + Arc::new(quantity.finish()), + Arc::new(extprice.finish()), + Arc::new(discount.finish()), + Arc::new(tax.finish()), + Arc::new(retflag.finish()), + Arc::new(linestatus.finish()), + Arc::new(shipdate.finish()), + Arc::new(commitdate.finish()), + Arc::new(receiptdate.finish()), + Arc::new(shipinstruct.finish()), + Arc::new(shipmode.finish()), + ], + ) + .unwrap(); + batches.push(batch); + } + (schema, batches) +} + // Benchmarks spill write + read performance across multiple compression codecs // using realistic input data inspired by TPC-H aggregate spill scenarios. // // This function prepares synthetic RecordBatches that mimic the schema and distribution -// of intermediate aggregate results from representative TPC-H queries (Q2, Q16, Q20). +// of intermediate aggregate results from representative TPC-H queries (Q2, Q16, Q20) and sort-tpch Q10 // For each dataset: // - It evaluates spill performance under different compression codecs (e.g., Uncompressed, Zstd, LZ4). // - It measures end-to-end spill write + read performance using Criterion. @@ -316,8 +420,11 @@ fn bench_spill_compression(c: &mut Criterion) { SpillCompression::Lz4Frame, ]; - // Q2 - let (schema, batches) = create_q2_like_batches(); + // Modify this value to change data volume. Note that each batch contains 8192 rows. + let num_batches = 50; + + // Q2 [Int64, Decimal128] + let (schema, batches) = create_q2_like_batches(50); benchmark_spill_batches_for_all_codec( &mut group, "q2", @@ -327,8 +434,8 @@ fn bench_spill_compression(c: &mut Criterion) { env.clone(), schema, ); - // Q16 - let (schema, batches) = create_q16_like_batches(); + // Q16 [Utf8, Utf8, Int32, Int64] + let (schema, batches) = create_q16_like_batches(50); benchmark_spill_batches_for_all_codec( &mut group, "q16", @@ -338,18 +445,28 @@ fn bench_spill_compression(c: &mut Criterion) { env.clone(), schema, ); - // Q20 - let (schema, batches) = create_q20_like_batches(); + // Q20 [Int64, Int64, Decimal128] + let (schema, batches) = create_q20_like_batches(50); benchmark_spill_batches_for_all_codec( &mut group, "q20", batches, &compressions, &rt, + env.clone(), + schema, + ); + // wide [Int32, Int64 * 3, Decimal128 * 4, Date * 3, Utf8 * 4] + let (schema, batches) = create_wide_batches(num_batches); + benchmark_spill_batches_for_all_codec( + &mut group, + "wide", + batches, + &compressions, + &rt, env, schema, ); - group.finish(); }