diff --git a/Cargo.lock b/Cargo.lock index e2e593e62528..b57c175bdbe7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -247,8 +247,7 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1bb018b6960c87fd9d025009820406f74e83281185a8bdcb44880d2aa5c9a87" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-arith", "arrow-array", @@ -271,8 +270,7 @@ dependencies = [ [[package]] name = "arrow-arith" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44de76b51473aa888ecd6ad93ceb262fb8d40d1f1154a4df2f069b3590aa7575" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-array", "arrow-buffer", @@ -285,8 +283,7 @@ dependencies = [ [[package]] name = "arrow-array" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29ed77e22744475a9a53d00026cf8e166fe73cf42d89c4c4ae63607ee1cfcc3f" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "ahash 0.8.12", "arrow-buffer", @@ -302,8 +299,7 @@ dependencies = [ [[package]] name = "arrow-buffer" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0391c96eb58bf7389171d1e103112d3fc3e5625ca6b372d606f2688f1ea4cce" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "bytes", "half", @@ -313,8 +309,7 @@ dependencies = [ [[package]] name = "arrow-cast" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f39e1d774ece9292697fcbe06b5584401b26bd34be1bec25c33edae65c2420ff" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-array", "arrow-buffer", @@ -334,8 +329,7 @@ dependencies = [ [[package]] name = "arrow-csv" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9055c972a07bf12c2a827debfd34f88d3b93da1941d36e1d9fee85eebe38a12a" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-array", "arrow-cast", @@ -343,15 +337,13 @@ dependencies = [ "chrono", "csv", "csv-core", - "lazy_static", "regex", ] [[package]] name = "arrow-data" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf75ac27a08c7f48b88e5c923f267e980f27070147ab74615ad85b5c5f90473d" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-buffer", "arrow-schema", @@ -362,8 +354,7 @@ dependencies = [ [[package]] name = "arrow-flight" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91efc67a4f5a438833dd76ef674745c80f6f6b9a428a3b440cbfbf74e32867e6" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-arith", "arrow-array", @@ -389,8 +380,7 @@ dependencies = [ [[package]] name = "arrow-ipc" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a222f0d93772bd058d1268f4c28ea421a603d66f7979479048c429292fac7b2e" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-array", "arrow-buffer", @@ -403,8 +393,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9085342bbca0f75e8cb70513c0807cc7351f1fbf5cb98192a67d5e3044acb033" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-array", "arrow-buffer", @@ -425,8 +414,7 @@ dependencies = [ [[package]] name = "arrow-ord" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2f1065a5cad7b9efa9e22ce5747ce826aa3855766755d4904535123ef431e7" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-array", "arrow-buffer", @@ -438,8 +426,7 @@ dependencies = [ [[package]] name = "arrow-row" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3703a0e3e92d23c3f756df73d2dc9476873f873a76ae63ef9d3de17fda83b2d8" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-array", "arrow-buffer", @@ -451,8 +438,7 @@ dependencies = [ [[package]] name = "arrow-schema" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73a47aa0c771b5381de2b7f16998d351a6f4eb839f1e13d48353e17e873d969b" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "bitflags 2.9.1", "serde", @@ -462,8 +448,7 @@ dependencies = [ [[package]] name = "arrow-select" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24b7b85575702b23b85272b01bc1c25a01c9b9852305e5d0078c79ba25d995d4" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -476,8 +461,7 @@ dependencies = [ [[package]] name = "arrow-string" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9260fddf1cdf2799ace2b4c2fc0356a9789fa7551e0953e35435536fecefebbd" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "arrow-array", "arrow-buffer", @@ -4436,8 +4420,7 @@ dependencies = [ [[package]] name = "parquet" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be7b2d778f6b841d37083ebdf32e33a524acde1266b5884a8ca29bf00dfa1231" +source = "git+https://github.com/zhuqi-lucas/arrow-rs.git?branch=add_pub_write_all_api#5c52e40f177702e5dfcc67bd700dbd2234d51222" dependencies = [ "ahash 0.8.12", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index f2cd6f72c7e6..f70c390d98ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,19 +88,19 @@ ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } apache-avro = { version = "0.17", default-features = false } -arrow = { version = "55.1.0", features = [ +arrow = { git = "https://github.com/zhuqi-lucas/arrow-rs.git", branch = "add_pub_write_all_api", features = [ "prettyprint", "chrono-tz", ] } -arrow-buffer = { version = "55.0.0", default-features = false } -arrow-flight = { version = "55.1.0", features = [ +arrow-buffer = { git = "https://github.com/zhuqi-lucas/arrow-rs.git", branch = "add_pub_write_all_api", default-features = false } +arrow-flight = { git = "https://github.com/zhuqi-lucas/arrow-rs.git", branch = "add_pub_write_all_api", features = [ "flight-sql-experimental", ] } -arrow-ipc = { version = "55.0.0", default-features = false, features = [ +arrow-ipc = { git = "https://github.com/zhuqi-lucas/arrow-rs.git", branch = "add_pub_write_all_api", default-features = false, features = [ "lz4", ] } -arrow-ord = { version = "55.0.0", default-features = false } -arrow-schema = { version = "55.0.0", default-features = false } +arrow-ord = { git = "https://github.com/zhuqi-lucas/arrow-rs.git", branch = "add_pub_write_all_api", default-features = false } +arrow-schema = { git = "https://github.com/zhuqi-lucas/arrow-rs.git", branch = "add_pub_write_all_api", default-features = false } async-trait = "0.1.88" bigdecimal = "0.4.8" bytes = "1.10" @@ -152,7 +152,7 @@ itertools = "0.14" log = "^0.4" object_store = { version = "0.12.0", default-features = false } parking_lot = "0.12" -parquet = { version = "55.1.0", default-features = false, features = [ +parquet = { git = "https://github.com/zhuqi-lucas/arrow-rs.git", branch = "add_pub_write_all_api", default-features = false, features = [ "arrow", "async", "object_store", diff --git a/datafusion-examples/examples/embedding_parquet_indexes.rs b/datafusion-examples/examples/embedding_parquet_indexes.rs new file mode 100644 index 000000000000..d6515fec19f5 --- /dev/null +++ b/datafusion-examples/examples/embedding_parquet_indexes.rs @@ -0,0 +1,391 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Example: embedding and using a custom “distinct values” index in Parquet files +//! +//! This example shows how to build and leverage a file‑level distinct‑values index +//! for pruning in DataFusion’s Parquet scans. +//! +//! Steps: +//! 1. Compute the distinct values for a target column and serialize them into bytes. +//! 2. Write each Parquet file with: +//! - regular data pages for your column +//! - the magic marker `IDX1` and a little‑endian length, to identify our custom index format +//! - the serialized distinct‑values bytes +//! - footer key/value metadata entries (`distinct_index_offset` and `distinct_index_length`) +//! 3. Read back each file’s footer metadata to locate and deserialize the index. +//! 4. Build a `DistinctIndexTable` (a custom `TableProvider`) that scans footers +//! into a map of filename → `HashSet` of distinct values. +//! 5. In `scan()`, prune out any Parquet files whose distinct set doesn’t match the +//! `category = 'X'` filter, then only read data from the remaining files. +//! +//! This technique embeds a lightweight, application‑specific index directly in Parquet +//! metadata to achieve efficient file‑level pruning without modifying the Parquet format. +//! +//! And it's very efficient, since we don't add any additional info to the metadata, we write the custom index +//! after the data pages, and we only read it when needed. + +use arrow::array::{ArrayRef, StringArray}; +use arrow::record_batch::RecordBatch; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::{HashMap, HashSet, Result}; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::memory::DataSourceExec; +use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; +use datafusion::datasource::TableType; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::logical_expr::{Operator, TableProviderFilterPushDown}; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::errors::ParquetError; +use datafusion::parquet::file::metadata::KeyValue; +use datafusion::parquet::file::reader::{FileReader, SerializedFileReader}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::*; +use datafusion::scalar::ScalarValue; +use std::fs::{create_dir_all, read_dir, File}; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tempfile::TempDir; + +/// +/// Example creating the Parquet file that +/// contains specialized indexes and a page‑index offset +/// +/// Note: the page index offset will after the custom index, which +/// is originally after the data pages. +/// +/// ```text +/// ┌──────────────────────┐ +/// │┌───────────────────┐ │ +/// ││ DataPage │ │ Standard Parquet +/// │└───────────────────┘ │ Data pages +/// │┌───────────────────┐ │ +/// ││ DataPage │ │ +/// │└───────────────────┘ │ +/// │ ... │ +/// │ │ +/// │┌───────────────────┐ │ +/// ││ DataPage │ │ +/// │└───────────────────┘ │ +/// │┏━━━━━━━━━━━━━━━━━━━┓ │ +/// │┃ ┃ │ key/value metadata +/// │┃ Special Index ┃◀┼──── that points to the +/// │┃ ┃ │ │ custom index blob +/// │┗━━━━━━━━━━━━━━━━━━━┛ │ +/// │┏───────────────────┓ │ +/// │┃ Page Index Offset ┃◀┼──── little‑endian u64 +/// │┗───────────────────┛ │ │ sitting after the custom index +/// │╔═══════════════════╗ │ │ +/// │║ ║ │ +/// │║ Parquet Footer ║ │ │ thrift‑encoded +/// │║ ║ ┼────── ParquetMetadata +/// │║ ║ │ +/// │╚═══════════════════╝ │ +/// └──────────────────────┘ +/// +/// Parquet File +/// ``` +/// DistinctIndexTable is a custom TableProvider that reads Parquet files + +#[derive(Debug, Clone)] +struct DistinctIndex { + inner: HashSet, +} + +impl DistinctIndex { + // Init from iterator of distinct values + pub fn new>(iter: I) -> Self { + Self { + inner: iter.into_iter().collect(), + } + } + + // serialize the distinct index to a writer + fn serialize( + &self, + arrow_writer: &mut ArrowWriter, + ) -> Result<()> { + let distinct: HashSet<_> = self.inner.iter().collect(); + let serialized = distinct + .into_iter() + .map(|s| s.as_str()) + .collect::>() + .join("\n"); + let index_bytes = serialized.into_bytes(); + + // Set the offset for the index + let offset = arrow_writer.bytes_written(); + let index_len = index_bytes.len() as u64; + + println!("Writing custom index at offset: {offset}, length: {index_len}"); + // Write the index magic and length to the file + arrow_writer.write_all(b"IDX1")?; + arrow_writer.write_all(&index_len.to_le_bytes())?; + + // Write the index bytes + arrow_writer.write_all(&index_bytes)?; + + // Append metadata about the index to the Parquet file footer + arrow_writer.append_key_value_metadata(KeyValue::new( + "distinct_index_offset".to_string(), + offset.to_string(), + )); + arrow_writer.append_key_value_metadata(KeyValue::new( + "distinct_index_length".to_string(), + index_bytes.len().to_string(), + )); + Ok(()) + } + + // create a new distinct index from the specified bytes + fn new_from_bytes(serialized: &[u8]) -> Result { + let s = String::from_utf8(serialized.to_vec()) + .map_err(|e| ParquetError::General(e.to_string()))?; + + Ok(Self { + inner: s.lines().map(|s| s.to_string()).collect(), + }) + } +} + +#[derive(Debug)] +struct DistinctIndexTable { + schema: SchemaRef, + index: HashMap, + dir: PathBuf, +} + +impl DistinctIndexTable { + /// Scan a directory, read each file's footer metadata into a map + fn try_new(dir: impl Into, schema: SchemaRef) -> Result { + let dir = dir.into(); + let mut index = HashMap::new(); + + for entry in read_dir(&dir)? { + let path = entry?.path(); + if path.extension().and_then(|s| s.to_str()) != Some("parquet") { + continue; + } + let file_name = path.file_name().unwrap().to_string_lossy().to_string(); + + let distinct_set = read_distinct_index(&path)?; + + println!("Read distinct index for {file_name}: {file_name:?}"); + index.insert(file_name, distinct_set); + } + + Ok(Self { schema, index, dir }) + } +} + +pub struct IndexedParquetWriter { + writer: ArrowWriter, +} + +impl IndexedParquetWriter { + pub fn try_new(sink: W, schema: Arc) -> Result { + let writer = ArrowWriter::try_new(sink, schema, None)?; + Ok(Self { writer }) + } +} + +/// Magic bytes to identify our custom index format +const INDEX_MAGIC: &[u8] = b"IDX1"; + +fn write_file_with_index(path: &Path, values: &[&str]) -> Result<()> { + let field = Field::new("category", DataType::Utf8, false); + let schema = Arc::new(Schema::new(vec![field.clone()])); + let arr: ArrayRef = Arc::new(StringArray::from(values.to_vec())); + let batch = RecordBatch::try_new(schema.clone(), vec![arr])?; + + let file = File::create(path)?; + + let mut writer = IndexedParquetWriter::try_new(file, schema.clone())?; + + // Write the data pages + writer.writer.write(&batch)?; + // Close row group + writer.writer.flush()?; + + let distinct_index: DistinctIndex = + DistinctIndex::new(values.iter().map(|s| s.to_string())); + + distinct_index.serialize(&mut writer.writer)?; + + writer.writer.close()?; + + println!("Finished writing file to {}", path.display()); + Ok(()) +} + +fn read_distinct_index(path: &Path) -> Result { + let mut file = File::open(path)?; + + let file_size = file.metadata()?.len(); + println!( + "Reading index from {} (size: {})", + path.display(), + file_size + ); + + let reader = SerializedFileReader::new(file.try_clone()?)?; + let meta = reader.metadata().file_metadata(); + + let offset = meta + .key_value_metadata() + .and_then(|kvs| kvs.iter().find(|kv| kv.key == "distinct_index_offset")) + .and_then(|kv| kv.value.as_ref()) + .ok_or_else(|| ParquetError::General("Missing index offset".into()))? + .parse::() + .map_err(|e| ParquetError::General(e.to_string()))?; + + let length = meta + .key_value_metadata() + .and_then(|kvs| kvs.iter().find(|kv| kv.key == "distinct_index_length")) + .and_then(|kv| kv.value.as_ref()) + .ok_or_else(|| ParquetError::General("Missing index length".into()))? + .parse::() + .map_err(|e| ParquetError::General(e.to_string()))?; + + println!("Reading index at offset: {offset}, length: {length}"); + + file.seek(SeekFrom::Start(offset))?; + + let mut magic_buf = [0u8; 4]; + file.read_exact(&mut magic_buf)?; + if magic_buf != INDEX_MAGIC { + return Err(ParquetError::General("Invalid index magic".into())); + } + + let mut len_buf = [0u8; 8]; + file.read_exact(&mut len_buf)?; + let stored_len = u64::from_le_bytes(len_buf) as usize; + + if stored_len != length { + return Err(ParquetError::General("Index length mismatch".into())); + } + + let mut index_buf = vec![0u8; length]; + file.read_exact(&mut index_buf)?; + + let index = DistinctIndex::new_from_bytes(&index_buf) + .map_err(|e| ParquetError::General(e.to_string()))?; + Ok(index) +} + +/// Implement TableProvider for DistinctIndexTable, using the distinct index to prune files +#[async_trait] +impl TableProvider for DistinctIndexTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + fn table_type(&self) -> TableType { + TableType::Base + } + + /// Prune files before reading: only keep files whose distinct set contains the filter value + async fn scan( + &self, + _ctx: &dyn Session, + _proj: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> Result> { + // Look for a single `category = 'X'` filter + let mut target: Option = None; + + if filters.len() == 1 { + if let Expr::BinaryExpr(expr) = &filters[0] { + if expr.op == Operator::Eq { + if let ( + Expr::Column(c), + Expr::Literal(ScalarValue::Utf8(Some(v)), _), + ) = (&*expr.left, &*expr.right) + { + if c.name == "category" { + println!("Filtering for category: {v}"); + target = Some(v.clone()); + } + } + } + } + } + // Determine which files to scan + let keep: Vec = self + .index + .iter() + .filter(|(_f, set)| target.as_ref().is_none_or(|v| set.inner.contains(v))) + .map(|(f, _)| f.clone()) + .collect(); + + println!("Pruned files: {:?}", keep.clone()); + + // Build ParquetSource for kept files + let url = ObjectStoreUrl::parse("file://")?; + let source = Arc::new(ParquetSource::default().with_enable_page_index(true)); + let mut builder = FileScanConfigBuilder::new(url, self.schema.clone(), source); + for file in keep { + let path = self.dir.join(&file); + let len = std::fs::metadata(&path)?.len(); + builder = builder.with_file(PartitionedFile::new( + path.to_str().unwrap().to_string(), + len, + )); + } + Ok(DataSourceExec::from_data_source(builder.build())) + } + + fn supports_filters_pushdown( + &self, + fs: &[&Expr], + ) -> Result> { + // Mark as inexact since pruning is file‑granular + Ok(vec![TableProviderFilterPushDown::Inexact; fs.len()]) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + // 1. Create temp dir and write 3 Parquet files with different category sets + let tmp = TempDir::new()?; + let dir = tmp.path(); + create_dir_all(dir)?; + write_file_with_index(&dir.join("a.parquet"), &["foo", "bar", "foo"])?; + write_file_with_index(&dir.join("b.parquet"), &["baz", "qux"])?; + write_file_with_index(&dir.join("c.parquet"), &["foo", "quux", "quux"])?; + + // 2. Register our custom TableProvider + let field = Field::new("category", DataType::Utf8, false); + let schema_ref = Arc::new(Schema::new(vec![field])); + let provider = Arc::new(DistinctIndexTable::try_new(dir, schema_ref.clone())?); + + let ctx = SessionContext::new(); + + ctx.register_table("t", provider)?; + + // 3. Run a query: only files containing 'foo' get scanned + let df = ctx.sql("SELECT * FROM t WHERE category = 'foo'").await?; + df.show().await?; + + Ok(()) +}