diff --git a/Cargo.lock b/Cargo.lock index 9b07a03699ae..11000bcbdc7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1986,6 +1986,7 @@ dependencies = [ "chrono", "half", "hashbrown 0.14.5", + "hex", "indexmap 2.9.0", "insta", "libc", @@ -4443,6 +4444,7 @@ dependencies = [ "num-bigint", "object_store", "paste", + "ring", "seq-macro", "simdutf8", "snap", diff --git a/Cargo.toml b/Cargo.toml index 64483eeb93da..02576363cece 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -155,6 +155,7 @@ parquet = { version = "55.1.0", default-features = false, features = [ "arrow", "async", "object_store", + "encryption", ] } pbjson = { version = "0.7.0" } pbjson-types = "0.7" diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index 06337cb75888..41b64063c099 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -60,11 +60,11 @@ pub async fn main() -> Result<()> { Options::Cancellation(opt) => opt.run().await, Options::Clickbench(opt) => opt.run().await, Options::H2o(opt) => opt.run().await, - Options::Imdb(opt) => opt.run().await, + Options::Imdb(opt) => Box::pin(opt.run()).await, Options::ParquetFilter(opt) => opt.run().await, Options::Sort(opt) => opt.run().await, Options::SortTpch(opt) => opt.run().await, - Options::Tpch(opt) => opt.run().await, + Options::Tpch(opt) => Box::pin(opt.run()).await, Options::TpchConvert(opt) => opt.run().await, } } diff --git a/benchmarks/src/bin/imdb.rs b/benchmarks/src/bin/imdb.rs index 13421f8a89a9..5ce99928df66 100644 --- a/benchmarks/src/bin/imdb.rs +++ b/benchmarks/src/bin/imdb.rs @@ -53,7 +53,7 @@ pub async fn main() -> Result<()> { env_logger::init(); match ImdbOpt::from_args() { ImdbOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) => { - opt.run().await + Box::pin(opt.run()).await } ImdbOpt::Convert(opt) => opt.run().await, } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 3270b082cfb4..ca2bb8e57c0e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -58,7 +58,7 @@ async fn main() -> Result<()> { env_logger::init(); match TpchOpt::from_args() { TpchOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) => { - opt.run().await + Box::pin(opt.run()).await } TpchOpt::Convert(opt) => opt.run().await, } diff --git a/datafusion-cli/tests/sql/encrypted_parquet.sql b/datafusion-cli/tests/sql/encrypted_parquet.sql new file mode 100644 index 000000000000..ad866a8fdd7e --- /dev/null +++ b/datafusion-cli/tests/sql/encrypted_parquet.sql @@ -0,0 +1,75 @@ +/* + Test parquet encryption and decryption in DataFusion SQL. + See datafusion/common/src/config.rs for equivalent rust code +*/ + +-- Keys are hex encoded, you can generate these via encode, e.g. +select encode('0123456789012345', 'hex'); +/* +Expected output: ++----------------------------------------------+ +| encode(Utf8("0123456789012345"),Utf8("hex")) | ++----------------------------------------------+ +| 30313233343536373839303132333435 | ++----------------------------------------------+ +*/ + +CREATE EXTERNAL TABLE encrypted_parquet_table +( +double_field double, +float_field float +) +STORED AS PARQUET LOCATION 'pq/' OPTIONS ( + 'format.crypto.file_encryption.encrypt_footer' 'true', + 'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" + 'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450" + 'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451" + -- Same for decryption + 'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" + 'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450" + 'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451" +); + +CREATE TABLE temp_table ( + double_field double, + float_field float +); + +INSERT INTO temp_table VALUES(-1.0, -1.0); +INSERT INTO temp_table VALUES(1.0, 2.0); +INSERT INTO temp_table VALUES(3.0, 4.0); +INSERT INTO temp_table VALUES(5.0, 6.0); + +INSERT INTO TABLE encrypted_parquet_table(double_field, float_field) SELECT * FROM temp_table; + +SELECT * FROM encrypted_parquet_table +WHERE double_field > 0.0 AND float_field > 0.0; + +/* +Expected output: ++--------------+-------------+ +| double_field | float_field | ++--------------+-------------+ +| 1.0 | 2.0 | +| 5.0 | 6.0 | +| 3.0 | 4.0 | ++--------------+-------------+ +*/ + +CREATE EXTERNAL TABLE parquet_table +( +double_field double, +float_field float +) +STORED AS PARQUET LOCATION 'pq/'; + +SELECT * FROM parquet_table; +/* +Expected output: +Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided +*/ + + + + + diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 3ba4c77cd84c..285762bb57e7 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -65,6 +65,7 @@ cargo run --example dataframe - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros - [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates +- [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion - [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries - [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution - [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. diff --git a/datafusion-examples/examples/parquet_encrypted.rs b/datafusion-examples/examples/parquet_encrypted.rs new file mode 100644 index 000000000000..823c5bebc9e4 --- /dev/null +++ b/datafusion-examples/examples/parquet_encrypted.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::common::DataFusionError; +use datafusion::config::TableParquetOptions; +use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; +use datafusion::logical_expr::{col, lit}; +use datafusion::parquet::encryption::decrypt::FileDecryptionProperties; +use datafusion::parquet::encryption::encrypt::FileEncryptionProperties; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use tempfile::TempDir; + +#[tokio::main] +async fn main() -> datafusion::common::Result<()> { + // The SessionContext is the main high level API for interacting with DataFusion + let ctx = SessionContext::new(); + + // Find the local path of "alltypes_plain.parquet" + let testdata = datafusion::test_util::parquet_test_data(); + let filename = &format!("{testdata}/alltypes_plain.parquet"); + + // Read the sample parquet file + let parquet_df = ctx + .read_parquet(filename, ParquetReadOptions::default()) + .await?; + + // Show information from the dataframe + println!( + "===============================================================================" + ); + println!("Original Parquet DataFrame:"); + query_dataframe(&parquet_df).await?; + + // Setup encryption and decryption properties + let (encrypt, decrypt) = setup_encryption(&parquet_df)?; + + // Create a temporary file location for the encrypted parquet file + let tmp_dir = TempDir::new()?; + let tempfile = tmp_dir.path().join("alltypes_plain-encrypted.parquet"); + let tempfile_str = tempfile.into_os_string().into_string().unwrap(); + + // Write encrypted parquet + let mut options = TableParquetOptions::default(); + options.crypto.file_encryption = Some((&encrypt).into()); + parquet_df + .write_parquet( + tempfile_str.as_str(), + DataFrameWriteOptions::new().with_single_file_output(true), + Some(options), + ) + .await?; + + // Read encrypted parquet + let ctx: SessionContext = SessionContext::new(); + let read_options = ParquetReadOptions::default().file_decryption_properties(decrypt); + + let encrypted_parquet_df = ctx.read_parquet(tempfile_str, read_options).await?; + + // Show information from the dataframe + println!("\n\n==============================================================================="); + println!("Encrypted Parquet DataFrame:"); + query_dataframe(&encrypted_parquet_df).await?; + + Ok(()) +} + +// Show information from the dataframe +async fn query_dataframe(df: &DataFrame) -> Result<(), DataFusionError> { + // show its schema using 'describe' + println!("Schema:"); + df.clone().describe().await?.show().await?; + + // Select three columns and filter the results + // so that only rows where id > 1 are returned + println!("\nSelected rows and columns:"); + df.clone() + .select_columns(&["id", "bool_col", "timestamp_col"])? + .filter(col("id").gt(lit(5)))? + .show() + .await?; + + Ok(()) +} + +// Setup encryption and decryption properties +fn setup_encryption( + parquet_df: &DataFrame, +) -> Result<(FileEncryptionProperties, FileDecryptionProperties), DataFusionError> { + let schema = parquet_df.schema(); + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_key = b"1234567890123450".to_vec(); // 128bit/16 + + let mut encrypt = FileEncryptionProperties::builder(footer_key.clone()); + let mut decrypt = FileDecryptionProperties::builder(footer_key.clone()); + + for field in schema.fields().iter() { + encrypt = encrypt.with_column_key(field.name().as_str(), column_key.clone()); + decrypt = decrypt.with_column_key(field.name().as_str(), column_key.clone()); + } + + let encrypt = encrypt.build()?; + let decrypt = decrypt.build()?; + Ok((encrypt, decrypt)) +} diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index d471e48be4e7..da1ebcc5fa5a 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -57,6 +57,7 @@ arrow-ipc = { workspace = true } base64 = "0.22.1" half = { workspace = true } hashbrown = { workspace = true } +hex = "0.4.3" indexmap = { workspace = true } libc = "0.2.172" log = { workspace = true } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 4efb67a37c99..c9e18a0557ea 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -27,6 +27,13 @@ use std::error::Error; use std::fmt::{self, Display}; use std::str::FromStr; +#[cfg(feature = "parquet")] +use hex; +#[cfg(feature = "parquet")] +use parquet::encryption::decrypt::FileDecryptionProperties; +#[cfg(feature = "parquet")] +use parquet::encryption::encrypt::FileEncryptionProperties; + /// A macro that wraps a configuration struct and automatically derives /// [`Default`] and [`ConfigField`] for it, allowing it to be used /// in the [`ConfigOptions`] configuration tree. @@ -187,7 +194,6 @@ macro_rules! config_namespace { } } } - config_namespace! { /// Options related to catalog and directory scanning /// @@ -594,6 +600,17 @@ config_namespace! { } } +config_namespace! { + /// Options for configuring Parquet Modular Encryption + pub struct ParquetEncryptionOptions { + /// Optional file decryption properties + pub file_decryption: Option, default = None + + /// Optional file encryption properties + pub file_encryption: Option, default = None + } +} + config_namespace! { /// Options related to query optimization /// @@ -1725,6 +1742,24 @@ pub struct TableParquetOptions { /// ) /// ``` pub key_value_metadata: HashMap>, + /// Options for configuring Parquet modular encryption + /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs + /// These can be set via 'format.crypto', for example: + /// ```sql + /// OPTIONS ( + /// 'format.crypto.file_encryption.encrypt_footer' 'true', + /// 'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" */ + /// 'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450" + /// 'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451" + /// -- Same for decryption + /// 'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" + /// 'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450" + /// 'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451" + /// ) + /// ``` + /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example. + /// Note that keys must be provided as in hex format since these are binary strings. + pub crypto: ParquetEncryptionOptions, } impl TableParquetOptions { @@ -1752,7 +1787,9 @@ impl ConfigField for TableParquetOptions { fn visit(&self, v: &mut V, key_prefix: &str, description: &'static str) { self.global.visit(v, key_prefix, description); self.column_specific_options - .visit(v, key_prefix, description) + .visit(v, key_prefix, description); + self.crypto + .visit(v, &format!("{key_prefix}.crypto"), description); } fn set(&mut self, key: &str, value: &str) -> Result<()> { @@ -1773,6 +1810,8 @@ impl ConfigField for TableParquetOptions { }; self.key_value_metadata.insert(k, Some(value.into())); Ok(()) + } else if let Some(crypto_feature) = key.strip_prefix("crypto.") { + self.crypto.set(crypto_feature, value) } else if key.contains("::") { self.column_specific_options.set(key, value) } else { @@ -1923,6 +1962,305 @@ config_namespace_with_hashmap! { } } +#[derive(Clone, Debug, Default, PartialEq)] +pub struct ConfigFileEncryptionProperties { + /// Should the parquet footer be encrypted + pub encrypt_footer: bool, + /// Key to use for the parquet footer encoded in hex format + pub footer_key_as_hex: String, + /// Metadata information for footer key + pub footer_key_metadata_as_hex: String, + /// HashMap of column names --> (key in hex format, metadata) + pub column_encryption_properties: HashMap, + /// AAD prefix string uniquely identifies the file and prevents file swapping + pub aad_prefix_as_hex: String, + /// If true, store the AAD prefix in the file + pub store_aad_prefix: bool, +} + +config_namespace_with_hashmap! { + pub struct ColumnEncryptionProperties { + /// Per column encryption key + pub column_key_as_hex: String, default = "".to_string() + /// Per column encryption key metadata + pub column_metadata_as_hex: Option, default = None + } +} + +impl ConfigField for ConfigFileEncryptionProperties { + fn visit(&self, v: &mut V, key_prefix: &str, _description: &'static str) { + let key = format!("{key_prefix}.encrypt_footer"); + let desc = "Encrypt the footer"; + self.encrypt_footer.visit(v, key.as_str(), desc); + + let key = format!("{key_prefix}.footer_key_as_hex"); + let desc = "Key to use for the parquet footer"; + self.footer_key_as_hex.visit(v, key.as_str(), desc); + + let key = format!("{key_prefix}.footer_key_metadata_as_hex"); + let desc = "Metadata to use for the parquet footer"; + self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc); + + let key = format!("{key_prefix}.aad_prefix_as_hex"); + let desc = "AAD prefix to use"; + self.aad_prefix_as_hex.visit(v, key.as_str(), desc); + + let key = format!("{key_prefix}.store_aad_prefix"); + let desc = "If true, store the AAD prefix"; + self.store_aad_prefix.visit(v, key.as_str(), desc); + + self.aad_prefix_as_hex.visit(v, key.as_str(), desc); + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + // Any hex encoded values must be pre-encoded using + // hex::encode() before calling set. + + if key.contains("::") { + // Handle any column specific properties + return self.column_encryption_properties.set(key, value); + }; + + let (key, rem) = key.split_once('.').unwrap_or((key, "")); + match key { + "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()), + "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()), + "footer_key_metadata_as_hex" => { + self.footer_key_metadata_as_hex.set(rem, value.as_ref()) + } + "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()), + "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()), + _ => _config_err!( + "Config value \"{}\" not found on ConfigFileEncryptionProperties", + key + ), + } + } +} + +#[cfg(feature = "parquet")] +impl From for FileEncryptionProperties { + fn from(val: ConfigFileEncryptionProperties) -> Self { + let mut fep = FileEncryptionProperties::builder( + hex::decode(val.footer_key_as_hex).unwrap(), + ) + .with_plaintext_footer(!val.encrypt_footer) + .with_aad_prefix_storage(val.store_aad_prefix); + + if !val.footer_key_metadata_as_hex.is_empty() { + fep = fep.with_footer_key_metadata( + hex::decode(&val.footer_key_metadata_as_hex) + .expect("Invalid footer key metadata"), + ); + } + + for (column_name, encryption_props) in val.column_encryption_properties.iter() { + let encryption_key = hex::decode(&encryption_props.column_key_as_hex) + .expect("Invalid column encryption key"); + let key_metadata = encryption_props + .column_metadata_as_hex + .as_ref() + .map(|x| hex::decode(x).expect("Invalid column metadata")); + match key_metadata { + Some(key_metadata) => { + fep = fep.with_column_key_and_metadata( + column_name, + encryption_key, + key_metadata, + ); + } + None => { + fep = fep.with_column_key(column_name, encryption_key); + } + } + } + + if !val.aad_prefix_as_hex.is_empty() { + let aad_prefix: Vec = + hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix"); + fep = fep.with_aad_prefix(aad_prefix); + } + fep.build().unwrap() + } +} + +#[cfg(feature = "parquet")] +impl From<&FileEncryptionProperties> for ConfigFileEncryptionProperties { + fn from(f: &FileEncryptionProperties) -> Self { + let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys(); + + let mut column_encryption_properties: HashMap< + String, + ColumnEncryptionProperties, + > = HashMap::new(); + + for (i, column_name) in column_names_vec.iter().enumerate() { + let column_key_as_hex = hex::encode(&column_keys_vec[i]); + let column_metadata_as_hex: Option = + column_metas_vec.get(i).map(hex::encode); + column_encryption_properties.insert( + column_name.clone(), + ColumnEncryptionProperties { + column_key_as_hex, + column_metadata_as_hex, + }, + ); + } + let mut aad_prefix: Vec = Vec::new(); + if let Some(prefix) = f.aad_prefix() { + aad_prefix = prefix.clone(); + } + ConfigFileEncryptionProperties { + encrypt_footer: f.encrypt_footer(), + footer_key_as_hex: hex::encode(f.footer_key()), + footer_key_metadata_as_hex: f + .footer_key_metadata() + .map(hex::encode) + .unwrap_or_default(), + column_encryption_properties, + aad_prefix_as_hex: hex::encode(aad_prefix), + store_aad_prefix: f.store_aad_prefix(), + } + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct ConfigFileDecryptionProperties { + /// Binary string to use for the parquet footer encoded in hex format + pub footer_key_as_hex: String, + /// HashMap of column names --> key in hex format + pub column_decryption_properties: HashMap, + /// AAD prefix string uniquely identifies the file and prevents file swapping + pub aad_prefix_as_hex: String, + /// If true, then verify signature for files with plaintext footers. + /// default = true + pub footer_signature_verification: bool, +} + +config_namespace_with_hashmap! { + pub struct ColumnDecryptionProperties { + /// Per column encryption key + pub column_key_as_hex: String, default = "".to_string() + } +} + +impl Default for ConfigFileDecryptionProperties { + fn default() -> Self { + ConfigFileDecryptionProperties { + footer_key_as_hex: String::new(), + column_decryption_properties: Default::default(), + aad_prefix_as_hex: String::new(), + footer_signature_verification: true, + } + } +} + +impl ConfigField for ConfigFileDecryptionProperties { + fn visit(&self, v: &mut V, key_prefix: &str, _description: &'static str) { + let key = format!("{key_prefix}.footer_key_as_hex"); + let desc = "Key to use for the parquet footer"; + self.footer_key_as_hex.visit(v, key.as_str(), desc); + + let key = format!("{key_prefix}.aad_prefix_as_hex"); + let desc = "AAD prefix to use"; + self.aad_prefix_as_hex.visit(v, key.as_str(), desc); + + let key = format!("{key_prefix}.footer_signature_verification"); + let desc = "If true, verify the footer signature"; + self.footer_signature_verification + .visit(v, key.as_str(), desc); + + self.column_decryption_properties.visit(v, key_prefix, desc); + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + // Any hex encoded values must be pre-encoded using + // hex::encode() before calling set. + + if key.contains("::") { + // Handle any column specific properties + return self.column_decryption_properties.set(key, value); + }; + + let (key, rem) = key.split_once('.').unwrap_or((key, "")); + match key { + "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()), + "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()), + "footer_signature_verification" => { + self.footer_signature_verification.set(rem, value.as_ref()) + } + _ => _config_err!( + "Config value \"{}\" not found on ConfigFileEncryptionProperties", + key + ), + } + } +} + +#[cfg(feature = "parquet")] +impl From for FileDecryptionProperties { + fn from(val: ConfigFileDecryptionProperties) -> Self { + let mut column_names: Vec<&str> = Vec::new(); + let mut column_keys: Vec> = Vec::new(); + + for (col_name, decryption_properties) in val.column_decryption_properties.iter() { + column_names.push(col_name.as_str()); + column_keys.push( + hex::decode(&decryption_properties.column_key_as_hex) + .expect("Invalid column decryption key"), + ); + } + + let mut fep = FileDecryptionProperties::builder( + hex::decode(val.footer_key_as_hex).expect("Invalid footer key"), + ) + .with_column_keys(column_names, column_keys) + .unwrap(); + + if !val.footer_signature_verification { + fep = fep.disable_footer_signature_verification(); + } + + if !val.aad_prefix_as_hex.is_empty() { + let aad_prefix = + hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix"); + fep = fep.with_aad_prefix(aad_prefix); + } + + fep.build().unwrap() + } +} + +#[cfg(feature = "parquet")] +impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties { + fn from(f: &FileDecryptionProperties) -> Self { + let (column_names_vec, column_keys_vec) = f.column_keys(); + let mut column_decryption_properties: HashMap< + String, + ColumnDecryptionProperties, + > = HashMap::new(); + for (i, column_name) in column_names_vec.iter().enumerate() { + let props = ColumnDecryptionProperties { + column_key_as_hex: hex::encode(column_keys_vec[i].clone()), + }; + column_decryption_properties.insert(column_name.clone(), props); + } + + let mut aad_prefix: Vec = Vec::new(); + if let Some(prefix) = f.aad_prefix() { + aad_prefix = prefix.clone(); + } + ConfigFileDecryptionProperties { + footer_key_as_hex: hex::encode( + f.footer_key(None).unwrap_or_default().as_ref(), + ), + column_decryption_properties, + aad_prefix_as_hex: hex::encode(aad_prefix), + footer_signature_verification: f.check_plaintext_footer_integrity(), + } + } +} + config_namespace! { /// Options controlling CSV format pub struct CsvOptions { @@ -2105,13 +2443,12 @@ impl Display for OutputFormat { #[cfg(test)] mod tests { - use std::any::Any; - use std::collections::HashMap; - use crate::config::{ ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions, Extensions, TableOptions, }; + use std::any::Any; + use std::collections::HashMap; #[derive(Default, Debug, Clone)] pub struct TestExtensionConfig { @@ -2240,6 +2577,129 @@ mod tests { ); } + #[cfg(feature = "parquet")] + #[test] + fn parquet_table_encryption() { + use crate::config::{ + ConfigFileDecryptionProperties, ConfigFileEncryptionProperties, + }; + use parquet::encryption::decrypt::FileDecryptionProperties; + use parquet::encryption::encrypt::FileEncryptionProperties; + + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_names = vec!["double_field", "float_field"]; + let column_keys = + vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()]; + + let file_encryption_properties = + FileEncryptionProperties::builder(footer_key.clone()) + .with_column_keys(column_names.clone(), column_keys.clone()) + .unwrap() + .build() + .unwrap(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.clone()) + .with_column_keys(column_names.clone(), column_keys.clone()) + .unwrap() + .build() + .unwrap(); + + // Test round-trip + let config_encrypt: ConfigFileEncryptionProperties = + (&file_encryption_properties).into(); + let encryption_properties_built: FileEncryptionProperties = + config_encrypt.clone().into(); + assert_eq!(file_encryption_properties, encryption_properties_built); + + let config_decrypt: ConfigFileDecryptionProperties = + (&decryption_properties).into(); + let decryption_properties_built: FileDecryptionProperties = + config_decrypt.clone().into(); + assert_eq!(decryption_properties, decryption_properties_built); + + /////////////////////////////////////////////////////////////////////////////////// + // Test encryption config + + // Display original encryption config + // println!("{:#?}", config_encrypt); + + let mut table_config = TableOptions::new(); + table_config.set_config_format(ConfigFileType::PARQUET); + table_config + .parquet + .set( + "crypto.file_encryption.encrypt_footer", + config_encrypt.encrypt_footer.to_string().as_str(), + ) + .unwrap(); + table_config + .parquet + .set( + "crypto.file_encryption.footer_key_as_hex", + config_encrypt.footer_key_as_hex.as_str(), + ) + .unwrap(); + + for (i, col_name) in column_names.iter().enumerate() { + let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}"); + let value = hex::encode(column_keys[i].clone()); + table_config + .parquet + .set(key.as_str(), value.as_str()) + .unwrap(); + } + + // Print matching final encryption config + // println!("{:#?}", table_config.parquet.crypto.file_encryption); + + assert_eq!( + table_config.parquet.crypto.file_encryption, + Some(config_encrypt) + ); + + /////////////////////////////////////////////////////////////////////////////////// + // Test decryption config + + // Display original decryption config + // println!("{:#?}", config_decrypt); + + let mut table_config = TableOptions::new(); + table_config.set_config_format(ConfigFileType::PARQUET); + table_config + .parquet + .set( + "crypto.file_decryption.footer_key_as_hex", + config_decrypt.footer_key_as_hex.as_str(), + ) + .unwrap(); + + for (i, col_name) in column_names.iter().enumerate() { + let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}"); + let value = hex::encode(column_keys[i].clone()); + table_config + .parquet + .set(key.as_str(), value.as_str()) + .unwrap(); + } + + // Print matching final decryption config + // println!("{:#?}", table_config.parquet.crypto.file_decryption); + + assert_eq!( + table_config.parquet.crypto.file_decryption, + Some(config_decrypt.clone()) + ); + + // Set config directly + let mut table_config = TableOptions::new(); + table_config.set_config_format(ConfigFileType::PARQUET); + table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone()); + assert_eq!( + table_config.parquet.crypto.file_decryption, + Some(config_decrypt.clone()) + ); + } + #[cfg(feature = "parquet")] #[test] fn parquet_table_options_config_entry() { diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 07e763f0ee6f..60f0f4abb0c0 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -95,10 +95,17 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { global, column_specific_options, key_value_metadata, + crypto, } = table_parquet_options; let mut builder = global.into_writer_properties_builder()?; + if let Some(file_encryption_properties) = &crypto.file_encryption { + builder = builder.with_file_encryption_properties( + file_encryption_properties.clone().into(), + ); + } + // check that the arrow schema is present in the kv_metadata, if configured to do so if !global.skip_arrow_metadata && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY) @@ -449,7 +456,10 @@ mod tests { }; use std::collections::HashMap; - use crate::config::{ParquetColumnOptions, ParquetOptions}; + use crate::config::{ + ConfigFileEncryptionProperties, ParquetColumnOptions, ParquetEncryptionOptions, + ParquetOptions, + }; use super::*; @@ -580,6 +590,9 @@ mod tests { HashMap::from([(COL_NAME.into(), configured_col_props)]) }; + let fep: Option = + props.file_encryption_properties().map(|fe| fe.into()); + #[allow(deprecated)] // max_statistics_size TableParquetOptions { global: ParquetOptions { @@ -627,6 +640,10 @@ mod tests { }, column_specific_options, key_value_metadata, + crypto: ParquetEncryptionOptions { + file_encryption: fep, + file_decryption: None, + }, } } @@ -681,6 +698,7 @@ mod tests { )] .into(), key_value_metadata: [(key, value)].into(), + crypto: Default::default(), }; let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 1bb5444ca009..fa0b34e413b7 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -246,4 +246,72 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn roundtrip_parquet_with_encryption() -> Result<()> { + use parquet::encryption::decrypt::FileDecryptionProperties; + use parquet::encryption::encrypt::FileEncryptionProperties; + + let test_df = test_util::test_table().await?; + + let schema = test_df.schema(); + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_key = b"1234567890123450".to_vec(); // 128bit/16 + + let mut encrypt = FileEncryptionProperties::builder(footer_key.clone()); + let mut decrypt = FileDecryptionProperties::builder(footer_key.clone()); + + for field in schema.fields().iter() { + encrypt = encrypt.with_column_key(field.name().as_str(), column_key.clone()); + decrypt = decrypt.with_column_key(field.name().as_str(), column_key.clone()); + } + + let encrypt = encrypt.build()?; + let decrypt = decrypt.build()?; + + let df = test_df.clone(); + let tmp_dir = TempDir::new()?; + let tempfile = tmp_dir.path().join("roundtrip.parquet"); + let tempfile_str = tempfile.into_os_string().into_string().unwrap(); + + // Write encrypted parquet using write_parquet + let mut options = TableParquetOptions::default(); + options.crypto.file_encryption = Some((&encrypt).into()); + + df.write_parquet( + tempfile_str.as_str(), + DataFrameWriteOptions::new().with_single_file_output(true), + Some(options), + ) + .await?; + let num_rows_written = test_df.count().await?; + + // Read encrypted parquet + let ctx: SessionContext = SessionContext::new(); + let read_options = + ParquetReadOptions::default().file_decryption_properties(decrypt); + + ctx.register_parquet("roundtrip_parquet", &tempfile_str, read_options.clone()) + .await?; + + let df_enc = ctx.sql("SELECT * FROM roundtrip_parquet").await?; + let num_rows_read = df_enc.count().await?; + + assert_eq!(num_rows_read, num_rows_written); + + // Read encrypted parquet and subset rows + columns + let encrypted_parquet_df = ctx.read_parquet(tempfile_str, read_options).await?; + + // Select three columns and filter the results + // Test that the filter works as expected + let selected = encrypted_parquet_df + .clone() + .select_columns(&["c1", "c2", "c3"])? + .filter(col("c2").gt(lit(4)))?; + + let num_rows_selected = selected.count().await?; + assert_eq!(num_rows_selected, 14); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 9aaf1cf59811..9ef956ef7995 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -43,6 +43,7 @@ use datafusion_common::{ use async_trait::async_trait; use datafusion_datasource_json::file_format::JsonFormat; use datafusion_expr::SortExpr; +use parquet::encryption::decrypt::FileDecryptionProperties; /// Options that control the reading of CSV files. /// @@ -252,6 +253,8 @@ pub struct ParquetReadOptions<'a> { pub schema: Option<&'a Schema>, /// Indicates how the file is sorted pub file_sort_order: Vec>, + /// Properties for decryption of Parquet files that use modular encryption + pub file_decryption_properties: Option, } impl Default for ParquetReadOptions<'_> { @@ -263,6 +266,7 @@ impl Default for ParquetReadOptions<'_> { skip_metadata: None, schema: None, file_sort_order: vec![], + file_decryption_properties: None, } } } @@ -313,6 +317,15 @@ impl<'a> ParquetReadOptions<'a> { self.file_sort_order = file_sort_order; self } + + /// Configure file decryption properties for reading encrypted Parquet files + pub fn file_decryption_properties( + mut self, + file_decryption_properties: FileDecryptionProperties, + ) -> Self { + self.file_decryption_properties = Some(file_decryption_properties); + self + } } /// Options that control the reading of ARROW files. @@ -574,7 +587,11 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { config: &SessionConfig, table_options: TableOptions, ) -> ListingOptions { - let mut file_format = ParquetFormat::new().with_options(table_options.parquet); + let mut options = table_options.parquet; + if let Some(file_decryption_properties) = &self.file_decryption_properties { + options.crypto.file_decryption = Some(file_decryption_properties.into()); + } + let mut file_format = ParquetFormat::new().with_options(options); if let Some(parquet_pruning) = self.parquet_pruning { file_format = file_format.with_enable_pruning(parquet_pruning) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6a5c19829c1c..4b53fa3efddc 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -196,7 +196,8 @@ mod tests { let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); let stats = - fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?; + fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None, None) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -204,7 +205,8 @@ mod tests { assert_eq!(c1_stats.null_count, Precision::Exact(1)); assert_eq!(c2_stats.null_count, Precision::Exact(3)); - let stats = fetch_statistics(store.as_ref(), schema, &meta[1], None).await?; + let stats = + fetch_statistics(store.as_ref(), schema, &meta[1], None, None).await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; let c2_stats = &stats.column_statistics[1]; @@ -376,9 +378,14 @@ mod tests { // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch // for the remaining metadata - fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], Some(9)) - .await - .expect("error reading metadata with hint"); + fetch_parquet_metadata( + store.as_ref() as &dyn ObjectStore, + &meta[0], + Some(9), + None, + ) + .await + .expect("error reading metadata with hint"); assert_eq!(store.request_count(), 2); @@ -396,9 +403,14 @@ mod tests { .await .unwrap(); - let stats = - fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9)) - .await?; + let stats = fetch_statistics( + store.upcast().as_ref(), + schema.clone(), + &meta[0], + Some(9), + None, + ) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -413,7 +425,7 @@ mod tests { // Use the file size as the hint so we can get the full metadata from the first fetch let size_hint = meta[0].size as usize; - fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint)) + fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint), None) .await .expect("error reading metadata with hint"); @@ -432,6 +444,7 @@ mod tests { schema.clone(), &meta[0], Some(size_hint), + None, ) .await?; @@ -448,7 +461,7 @@ mod tests { // Use the a size hint larger than the file size to make sure we don't panic let size_hint = (meta[0].size + 100) as usize; - fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint)) + fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint), None) .await .expect("error reading metadata with hint"); @@ -487,7 +500,8 @@ mod tests { let schema = format.infer_schema(&state, &store, &files).await.unwrap(); // Fetch statistics for first file - let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; + let pq_meta = + fetch_parquet_metadata(store.as_ref(), &files[0], None, None).await?; let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(4)); @@ -545,7 +559,8 @@ mod tests { }; // Fetch statistics for first file - let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; + let pq_meta = + fetch_parquet_metadata(store.as_ref(), &files[0], None, None).await?; let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 @@ -571,7 +586,8 @@ mod tests { assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone())); // Fetch statistics for second file - let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1], None).await?; + let pq_meta = + fetch_parquet_metadata(store.as_ref(), &files[1], None, None).await?; let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1: missing from the file so the table treats all 3 rows as null diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 761a78a29fd3..5fc3513ff745 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -241,6 +241,7 @@ impl AsyncFileReader for ParquetFileReader { self.store.as_ref(), &self.meta, self.metadata_size_hint, + None, ) .await .map_err(|e| { diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs new file mode 100644 index 000000000000..3f180aab3017 --- /dev/null +++ b/datafusion/core/tests/parquet/encryption.rs @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! non trivial integration testing for parquet predicate pushdown +//! +//! Testing hints: If you run this test with --nocapture it will tell you where +//! the generated parquet file went. You can then test it and try out various queries +//! datafusion-cli like: +//! +//! ```sql +//! create external table data stored as parquet location 'data.parquet'; +//! select * from data limit 10; +//! ``` + +use arrow::record_batch::RecordBatch; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use parquet::arrow::ArrowWriter; +use parquet::encryption::decrypt::FileDecryptionProperties; +use parquet::encryption::encrypt::FileEncryptionProperties; +use parquet::file::properties::WriterProperties; +use tempfile::TempDir; + +async fn read_parquet_test_data<'a, T: Into>( + path: T, + ctx: &SessionContext, + options: ParquetReadOptions<'a>, +) -> Vec { + ctx.read_parquet(path.into(), options) + .await + .unwrap() + .collect() + .await + .unwrap() +} + +pub fn write_batches( + path: PathBuf, + props: WriterProperties, + batches: impl IntoIterator, +) -> datafusion_common::Result { + let mut batches = batches.into_iter(); + let first_batch = batches.next().expect("need at least one record batch"); + let schema = first_batch.schema(); + + let file = File::create(&path)?; + let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?; + + writer.write(&first_batch)?; + let mut num_rows = first_batch.num_rows(); + + for batch in batches { + writer.write(&batch)?; + num_rows += batch.num_rows(); + } + writer.close()?; + Ok(num_rows) +} + +#[tokio::test] +async fn round_trip_encryption() { + let ctx: SessionContext = SessionContext::new(); + + let options = ParquetReadOptions::default(); + let batches = read_parquet_test_data( + "tests/data/filter_pushdown/single_file.gz.parquet", + &ctx, + options, + ) + .await; + + let schema = batches[0].schema(); + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_key = b"1234567890123450".to_vec(); // 128bit/16 + + let mut encrypt = FileEncryptionProperties::builder(footer_key.clone()); + let mut decrypt = FileDecryptionProperties::builder(footer_key.clone()); + + for field in schema.fields.iter() { + encrypt = encrypt.with_column_key(field.name().as_str(), column_key.clone()); + decrypt = decrypt.with_column_key(field.name().as_str(), column_key.clone()); + } + let encrypt = encrypt.build().unwrap(); + let decrypt = decrypt.build().unwrap(); + + // Write encrypted parquet + let props = WriterProperties::builder() + .with_file_encryption_properties(encrypt) + .build(); + + let tempdir = TempDir::new_in(Path::new(".")).unwrap(); + let tempfile = tempdir.path().join("data.parquet"); + let num_rows_written = write_batches(tempfile.clone(), props, batches).unwrap(); + + // Read encrypted parquet + let ctx: SessionContext = SessionContext::new(); + let options = ParquetReadOptions::default().file_decryption_properties(decrypt); + + let encrypted_batches = read_parquet_test_data( + tempfile.into_os_string().into_string().unwrap(), + &ctx, + options, + ) + .await; + + let num_rows_read = encrypted_batches + .iter() + .fold(0, |acc, x| acc + x.num_rows()); + + assert_eq!(num_rows_written, num_rows_read); +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 87a5ed33f127..835e8baedbb2 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -43,6 +43,7 @@ use std::sync::Arc; use tempfile::NamedTempFile; mod custom_reader; +mod encryption; mod external_access_plan; mod file_statistics; mod filter_pushdown; diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 647fbc8d051e..59663fe5100a 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -78,6 +78,7 @@ use parquet::arrow::arrow_writer::{ use parquet::arrow::async_reader::MetadataFetch; use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; use parquet::basic::Type; +use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::errors::ParquetError; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; @@ -303,10 +304,18 @@ async fn fetch_schema_with_location( store: &dyn ObjectStore, file: &ObjectMeta, metadata_size_hint: Option, + file_decryption_properties: Option<&FileDecryptionProperties>, coerce_int96: Option, ) -> Result<(Path, Schema)> { let loc_path = file.location.clone(); - let schema = fetch_schema(store, file, metadata_size_hint, coerce_int96).await?; + let schema = fetch_schema( + store, + file, + metadata_size_hint, + file_decryption_properties, + coerce_int96, + ) + .await?; Ok((loc_path, schema)) } @@ -341,12 +350,22 @@ impl FileFormat for ParquetFormat { Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?), None => None, }; + let config_file_decryption_properties = &self.options.crypto.file_decryption; + let file_decryption_properties: Option = + match config_file_decryption_properties { + Some(cfd) => { + let fd: FileDecryptionProperties = cfd.clone().into(); + Some(fd) + } + None => None, + }; let mut schemas: Vec<_> = futures::stream::iter(objects) .map(|object| { fetch_schema_with_location( store.as_ref(), object, self.metadata_size_hint(), + file_decryption_properties.as_ref(), coerce_int96, ) }) @@ -396,11 +415,21 @@ impl FileFormat for ParquetFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> Result { + let config_file_decryption_properties = &self.options.crypto.file_decryption; + let file_decryption_properties: Option = + match config_file_decryption_properties { + Some(cfd) => { + let fd: FileDecryptionProperties = cfd.clone().into(); + Some(fd) + } + None => None, + }; let stats = fetch_statistics( store.as_ref(), table_schema, object, self.metadata_size_hint(), + file_decryption_properties.as_ref(), ) .await?; Ok(stats) @@ -930,12 +959,14 @@ pub async fn fetch_parquet_metadata( store: &dyn ObjectStore, meta: &ObjectMeta, size_hint: Option, + decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { let file_size = meta.size; let fetch = ObjectStoreFetch::new(store, meta); ParquetMetaDataReader::new() .with_prefetch_hint(size_hint) + .with_decryption_properties(decryption_properties) .load_and_finish(fetch, file_size) .await .map_err(DataFusionError::from) @@ -946,9 +977,16 @@ async fn fetch_schema( store: &dyn ObjectStore, file: &ObjectMeta, metadata_size_hint: Option, + file_decryption_properties: Option<&FileDecryptionProperties>, coerce_int96: Option, ) -> Result { - let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; + let metadata = fetch_parquet_metadata( + store, + file, + metadata_size_hint, + file_decryption_properties, + ) + .await?; let file_metadata = metadata.file_metadata(); let schema = parquet_to_arrow_schema( file_metadata.schema_descr(), @@ -970,8 +1008,11 @@ pub async fn fetch_statistics( table_schema: SchemaRef, file: &ObjectMeta, metadata_size_hint: Option, + decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { - let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; + let metadata = + fetch_parquet_metadata(store, file, metadata_size_hint, decryption_properties) + .await?; statistics_from_parquet_meta_calc(&metadata, table_schema) } @@ -1261,9 +1302,15 @@ impl FileSink for ParquetSink { object_store: Arc, ) -> Result { let parquet_opts = &self.parquet_options; - let allow_single_file_parallelism = + let mut allow_single_file_parallelism = parquet_opts.global.allow_single_file_parallelism; + if parquet_opts.crypto.file_encryption.is_some() { + // For now, arrow-rs does not support parallel writes with encryption + // See https://github.com/apache/arrow-rs/issues/7359 + allow_single_file_parallelism = false; + } + let mut file_write_tasks: JoinSet< std::result::Result<(Path, FileMetaData), DataFusionError>, > = JoinSet::new(); diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 9e14425074f7..61790d5768e5 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -41,6 +41,7 @@ use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::file::metadata::ParquetMetaDataReader; /// Implements [`FileOpener`] for a parquet file @@ -82,6 +83,8 @@ pub(super) struct ParquetOpener { pub enable_row_group_stats_pruning: bool, /// Coerce INT96 timestamps to specific TimeUnit pub coerce_int96: Option, + /// Optional parquet FileDecryptionProperties + pub file_decryption_properties: Option>, } impl FileOpener for ParquetOpener { @@ -122,7 +125,14 @@ impl FileOpener for ParquetOpener { let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); - let enable_page_index = self.enable_page_index; + let mut enable_page_index = self.enable_page_index; + let file_decryption_properties = self.file_decryption_properties.clone(); + + // For now, page index does not work with encrypted files. See: + // https://github.com/apache/arrow-rs/issues/7629 + if file_decryption_properties.is_some() { + enable_page_index = false; + } Ok(Box::pin(async move { // Don't load the page index yet. Since it is not stored inline in @@ -131,6 +141,9 @@ impl FileOpener for ParquetOpener { // pruning predicates. Thus default to not requesting if from the // underlying reader. let mut options = ArrowReaderOptions::new().with_page_index(false); + if let Some(fd_val) = file_decryption_properties { + options = options.with_file_decryption_properties((*fd_val).clone()); + } let mut metadata_timer = file_metrics.metadata_load_time.timer(); // Begin by loading the metadata from the underlying reader (note diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 90f78a92d171..90aa3267e6ea 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -475,6 +475,13 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); + let file_decryption_properties = self + .table_parquet_options() + .crypto + .file_decryption + .as_ref() + .map(|props| Arc::new(props.clone().into())); + let coerce_int96 = self .table_parquet_options .global @@ -501,6 +508,7 @@ impl FileSource for ParquetSource { enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, schema_adapter_factory, coerce_int96, + file_decryption_properties, }) } diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index bd969db31687..0823e150268d 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1066,6 +1066,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { .unwrap(), column_specific_options, key_value_metadata: Default::default(), + crypto: Default::default(), }) } } diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index d3f6511ec98f..620442c79e72 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -576,6 +576,7 @@ impl From<&TableParquetOptionsProto> for TableParquetOptions { .iter() .map(|(k, v)| (k.clone(), Some(v.clone()))) .collect(), + crypto: Default::default(), } } }