From 26662443f1b82edc9931829493103992e849a0b3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 26 Sep 2025 09:54:10 -0400 Subject: [PATCH] Refactor: Use `Arc` to reduce size of ParquetMetadata and avoid copying --- parquet/src/arrow/arrow_reader/mod.rs | 11 ++++---- parquet/src/arrow/async_reader/mod.rs | 2 +- parquet/src/arrow/async_reader/store.rs | 5 ++-- parquet/src/encryption/decrypt.rs | 18 ++++++------- parquet/src/encryption/encrypt.rs | 5 ++-- parquet/src/file/metadata/mod.rs | 6 +++-- parquet/src/file/metadata/parser.rs | 4 +-- parquet/src/file/metadata/reader.rs | 8 +++--- parquet/src/file/properties.rs | 8 +++--- parquet/src/file/writer.rs | 10 +++---- parquet/tests/encryption/encryption.rs | 28 ++++++++++---------- parquet/tests/encryption/encryption_async.rs | 18 ++++++------- parquet/tests/encryption/encryption_util.rs | 15 ++++++----- 13 files changed, 72 insertions(+), 66 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 17bc5a298e61..a04ffb93ae7f 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -387,7 +387,7 @@ pub struct ArrowReaderOptions { pub(crate) page_index_policy: PageIndexPolicy, /// If encryption is enabled, the file decryption properties can be provided #[cfg(feature = "encryption")] - pub(crate) file_decryption_properties: Option, + pub(crate) file_decryption_properties: Option>, } impl ArrowReaderOptions { @@ -508,7 +508,7 @@ impl ArrowReaderOptions { #[cfg(feature = "encryption")] pub fn with_file_decryption_properties( self, - file_decryption_properties: FileDecryptionProperties, + file_decryption_properties: Arc, ) -> Self { Self { file_decryption_properties: Some(file_decryption_properties), @@ -528,7 +528,7 @@ impl ArrowReaderOptions { /// This can be set via /// [`file_decryption_properties`][Self::with_file_decryption_properties]. #[cfg(feature = "encryption")] - pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> { + pub fn file_decryption_properties(&self) -> Option<&Arc> { self.file_decryption_properties.as_ref() } } @@ -572,8 +572,9 @@ impl ArrowReaderMetadata { let metadata = ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy); #[cfg(feature = "encryption")] - let metadata = - metadata.with_decryption_properties(options.file_decryption_properties.as_ref()); + let metadata = metadata.with_decryption_properties( + options.file_decryption_properties.as_ref().map(Arc::clone), + ); let metadata = metadata.parse_and_finish(reader)?; Self::try_new(Arc::new(metadata), options) } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 33b03fbbca95..fca823ff1634 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -181,7 +181,7 @@ impl AsyncFileReader for T { #[cfg(feature = "encryption")] let metadata_reader = metadata_reader.with_decryption_properties( - options.and_then(|o| o.file_decryption_properties.as_ref()), + options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)), ); let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?; diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index ce1398b56d37..d313a110418c 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -206,8 +206,9 @@ impl AsyncFileReader for ParquetObjectReader { #[cfg(feature = "encryption")] if let Some(options) = options { - metadata = metadata - .with_decryption_properties(options.file_decryption_properties.as_ref()); + metadata = metadata.with_decryption_properties( + options.file_decryption_properties.as_ref().map(Arc::clone), + ); } let metadata = if let Some(file_size) = self.file_size { diff --git a/parquet/src/encryption/decrypt.rs b/parquet/src/encryption/decrypt.rs index d9b9ff0326b4..03518f11bcfb 100644 --- a/parquet/src/encryption/decrypt.rs +++ b/parquet/src/encryption/decrypt.rs @@ -438,16 +438,16 @@ impl DecryptionPropertiesBuilder { } /// Finalize the builder and return created [`FileDecryptionProperties`] - pub fn build(self) -> Result { + pub fn build(self) -> Result> { let keys = DecryptionKeys::Explicit(ExplicitDecryptionKeys { footer_key: self.footer_key, column_keys: self.column_keys, }); - Ok(FileDecryptionProperties { + Ok(Arc::new(FileDecryptionProperties { keys, aad_prefix: self.aad_prefix, footer_signature_verification: self.footer_signature_verification, - }) + })) } /// Specify the expected AAD prefix to be used for decryption. @@ -509,13 +509,13 @@ impl DecryptionPropertiesBuilderWithRetriever { } /// Finalize the builder and return created [`FileDecryptionProperties`] - pub fn build(self) -> Result { + pub fn build(self) -> Result> { let keys = DecryptionKeys::ViaRetriever(self.key_retriever); - Ok(FileDecryptionProperties { + Ok(Arc::new(FileDecryptionProperties { keys, aad_prefix: self.aad_prefix, footer_signature_verification: self.footer_signature_verification, - }) + })) } /// Specify the expected AAD prefix to be used for decryption. @@ -536,7 +536,7 @@ impl DecryptionPropertiesBuilderWithRetriever { #[derive(Clone, Debug)] pub(crate) struct FileDecryptor { - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, footer_decryptor: Arc, file_aad: Vec, } @@ -549,7 +549,7 @@ impl PartialEq for FileDecryptor { impl FileDecryptor { pub(crate) fn new( - decryption_properties: &FileDecryptionProperties, + decryption_properties: &Arc, footer_key_metadata: Option<&[u8]>, aad_file_unique: Vec, aad_prefix: Vec, @@ -565,7 +565,7 @@ impl FileDecryptor { Ok(Self { footer_decryptor: Arc::new(footer_decryptor), - decryption_properties: decryption_properties.clone(), + decryption_properties: Arc::clone(decryption_properties), file_aad, }) } diff --git a/parquet/src/encryption/encrypt.rs b/parquet/src/encryption/encrypt.rs index c8d3ffc0eef4..7e1dd3e4fd90 100644 --- a/parquet/src/encryption/encrypt.rs +++ b/parquet/src/encryption/encrypt.rs @@ -27,6 +27,7 @@ use crate::thrift::TSerializable; use ring::rand::{SecureRandom, SystemRandom}; use std::collections::{HashMap, HashSet}; use std::io::Write; +use std::sync::Arc; use thrift::protocol::TCompactOutputProtocol; #[derive(Debug, Clone, PartialEq)] @@ -289,13 +290,13 @@ impl EncryptionPropertiesBuilder { #[derive(Debug)] /// The encryption configuration for a single Parquet file pub(crate) struct FileEncryptor { - properties: FileEncryptionProperties, + properties: Arc, aad_file_unique: Vec, file_aad: Vec, } impl FileEncryptor { - pub(crate) fn new(properties: FileEncryptionProperties) -> Result { + pub(crate) fn new(properties: Arc) -> Result { // Generate unique AAD for file let rng = SystemRandom::new(); let mut aad_file_unique = vec![0u8; 8]; diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index e04b8c9c8e4d..c6dfa46d29a1 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -1974,7 +1974,8 @@ mod tests { #[cfg(not(feature = "encryption"))] let base_expected_size = 2312; #[cfg(feature = "encryption")] - let base_expected_size = 2648; + // Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472 + let base_expected_size = 2552; assert_eq!(parquet_meta.memory_size(), base_expected_size); @@ -2004,7 +2005,8 @@ mod tests { #[cfg(not(feature = "encryption"))] let bigger_expected_size = 2816; #[cfg(feature = "encryption")] - let bigger_expected_size = 3152; + // Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472 + let bigger_expected_size = 3056; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); diff --git a/parquet/src/file/metadata/parser.rs b/parquet/src/file/metadata/parser.rs index a68f14d4d7aa..26f611884c33 100644 --- a/parquet/src/file/metadata/parser.rs +++ b/parquet/src/file/metadata/parser.rs @@ -291,7 +291,7 @@ fn parse_single_offset_index( pub(crate) fn decode_metadata_with_encryption( buf: &[u8], encrypted_footer: bool, - file_decryption_properties: Option<&FileDecryptionProperties>, + file_decryption_properties: Option<&Arc>, ) -> crate::errors::Result { let mut prot = TCompactSliceInputProtocol::new(buf); let mut file_decryptor = None; @@ -390,7 +390,7 @@ pub(crate) fn decode_metadata_with_encryption( fn get_file_decryptor( encryption_algorithm: EncryptionAlgorithm, footer_key_metadata: Option<&[u8]>, - file_decryption_properties: &FileDecryptionProperties, + file_decryption_properties: &Arc, ) -> crate::errors::Result { match encryption_algorithm { EncryptionAlgorithm::AESGCMV1(algo) => { diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 4b8c57175d4e..68f381864446 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -76,7 +76,7 @@ pub struct ParquetMetaDataReader { // `self.parse_metadata` is called. metadata_size: Option, #[cfg(feature = "encryption")] - file_decryption_properties: Option, + file_decryption_properties: Option>, } /// Describes the policy for reading page indexes @@ -184,9 +184,9 @@ impl ParquetMetaDataReader { #[cfg(feature = "encryption")] pub fn with_decryption_properties( mut self, - properties: Option<&FileDecryptionProperties>, + properties: Option>, ) -> Self { - self.file_decryption_properties = properties.cloned(); + self.file_decryption_properties = properties; self } @@ -1176,7 +1176,7 @@ mod async_tests { // just make sure the metadata is properly decrypted and read let expected = ParquetMetaDataReader::new() - .with_decryption_properties(Some(&decryption_properties)) + .with_decryption_properties(Some(decryption_properties)) .load_via_suffix_and_finish(input) .await .unwrap(); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 603db6660f45..16b12d4fcc99 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -170,7 +170,7 @@ pub struct WriterProperties { statistics_truncate_length: Option, coerce_types: bool, #[cfg(feature = "encryption")] - pub(crate) file_encryption_properties: Option, + pub(crate) file_encryption_properties: Option>, } impl Default for WriterProperties { @@ -433,7 +433,7 @@ impl WriterProperties { /// /// For more details see [`WriterPropertiesBuilder::with_file_encryption_properties`] #[cfg(feature = "encryption")] - pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> { + pub fn file_encryption_properties(&self) -> Option<&Arc> { self.file_encryption_properties.as_ref() } } @@ -507,7 +507,7 @@ impl WriterPropertiesBuilder { statistics_truncate_length: self.statistics_truncate_length, coerce_types: self.coerce_types, #[cfg(feature = "encryption")] - file_encryption_properties: self.file_encryption_properties, + file_encryption_properties: self.file_encryption_properties.map(Arc::new), } } @@ -960,7 +960,7 @@ impl From for WriterPropertiesBuilder { statistics_truncate_length: props.statistics_truncate_length, coerce_types: props.coerce_types, #[cfg(feature = "encryption")] - file_encryption_properties: props.file_encryption_properties, + file_encryption_properties: props.file_encryption_properties.map(Arc::unwrap_or_clone), } } } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index fa72b060ea84..8fce35c11668 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -215,12 +215,12 @@ impl SerializedFileWriter { properties: &WriterPropertiesPtr, schema_descriptor: &SchemaDescriptor, ) -> Result>> { - if let Some(file_encryption_properties) = &properties.file_encryption_properties { + if let Some(file_encryption_properties) = properties.file_encryption_properties() { file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?; - Ok(Some(Arc::new(FileEncryptor::new( - file_encryption_properties.clone(), - )?))) + Ok(Some(Arc::new(FileEncryptor::new(Arc::clone( + file_encryption_properties, + ))?))) } else { Ok(None) } @@ -320,7 +320,7 @@ impl SerializedFileWriter { /// Writes magic bytes at the beginning of the file. #[cfg(feature = "encryption")] fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite) -> Result<()> { - let magic = get_file_magic(properties.file_encryption_properties.as_ref()); + let magic = get_file_magic(properties.file_encryption_properties.as_deref()); buf.write_all(magic)?; Ok(()) diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 96dd8654cd76..47ca0aa87b0a 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -89,8 +89,8 @@ fn test_plaintext_footer_signature_verification() { .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let result = ArrowReaderMetadata::load(&file, options.clone()); assert!(result.is_err()); assert!(result @@ -146,8 +146,8 @@ fn test_non_uniform_encryption_disabled_aad_storage() { .unwrap(); let file = File::open(path).unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let result = ArrowReaderMetadata::load(&file, options.clone()); assert!(result.is_err()); assert_eq!( @@ -277,8 +277,8 @@ fn test_uniform_encryption_plaintext_footer_with_key_retriever() { .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); // Write data into temporary file with plaintext footer and footer key metadata @@ -318,8 +318,8 @@ fn test_uniform_encryption_plaintext_footer_with_key_retriever() { .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let _ = ArrowReaderMetadata::load(&temp_file, options.clone()).unwrap(); // Read temporary file with plaintext metadata using key retriever with invalid key @@ -332,8 +332,8 @@ fn test_uniform_encryption_plaintext_footer_with_key_retriever() { let decryption_properties = FileDecryptionProperties::with_key_retriever(key_retriever) .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let result = ArrowReaderMetadata::load(&temp_file, options.clone()); assert!(result.is_err()); assert!(result @@ -668,7 +668,7 @@ fn test_write_uniform_encryption_plaintext_footer() { // Try writing plaintext footer and then reading it with the correct footer key read_and_roundtrip_to_encrypted_file( &file, - decryption_properties.clone(), + Arc::clone(&decryption_properties), file_encryption_properties.clone(), ); @@ -922,8 +922,8 @@ fn test_write_encrypted_struct_field() { .with_column_key("struct_col.float64_col", column_key_2) .build() .unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(temp_file, options).unwrap(); @@ -1036,7 +1036,7 @@ fn test_decrypt_page_index_non_uniform() { fn test_decrypt_page_index( path: &str, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) -> Result<(), ParquetError> { let file = File::open(path)?; let options = ArrowReaderOptions::default() diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 9c1e0c00a3f6..05f2bcb9c086 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -432,7 +432,7 @@ async fn test_decrypt_page_index_non_uniform() { async fn test_decrypt_page_index( path: &str, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) -> Result<(), ParquetError> { let mut file = File::open(&path).await?; @@ -449,7 +449,7 @@ async fn test_decrypt_page_index( async fn verify_encryption_test_file_read_async( file: &mut tokio::fs::File, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) -> Result<(), ParquetError> { let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); @@ -469,14 +469,14 @@ async fn verify_encryption_test_file_read_async( async fn read_and_roundtrip_to_encrypted_file_async( path: &str, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, encryption_properties: FileEncryptionProperties, ) -> Result<(), ParquetError> { let temp_file = tempfile::tempfile().unwrap(); let mut file = File::open(&path).await.unwrap(); - let options = - ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties.clone()); + let options = ArrowReaderOptions::new() + .with_file_decryption_properties(Arc::clone(&decryption_properties)); let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?; let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( file.try_clone().await?, @@ -754,7 +754,7 @@ async fn test_multi_threaded_encrypted_writing() { .unwrap(); let (record_batches, metadata) = - read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + read_encrypted_file(&file, Arc::clone(&decryption_properties)).unwrap(); let schema = metadata.schema().clone(); let props = Some( @@ -823,7 +823,7 @@ async fn test_multi_threaded_encrypted_writing() { // Check that the file was written correctly let (read_record_batches, read_metadata) = - read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + read_encrypted_file(&temp_file, decryption_properties).unwrap(); verify_encryption_test_data(read_record_batches, read_metadata.metadata()); // Check that file was encrypted @@ -853,7 +853,7 @@ async fn test_multi_threaded_encrypted_writing_deprecated() { .unwrap(); let (record_batches, metadata) = - read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + read_encrypted_file(&file, Arc::clone(&decryption_properties)).unwrap(); let to_write: Vec<_> = record_batches .iter() .flat_map(|rb| rb.columns().to_vec()) @@ -914,7 +914,7 @@ async fn test_multi_threaded_encrypted_writing_deprecated() { // Check that the file was written correctly let (read_record_batches, read_metadata) = - read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + read_encrypted_file(&temp_file, decryption_properties).unwrap(); verify_encryption_double_test_data(read_record_batches, read_metadata.metadata()); // Check that file was encrypted diff --git a/parquet/tests/encryption/encryption_util.rs b/parquet/tests/encryption/encryption_util.rs index f53e12adb720..ec3c5774b6fe 100644 --- a/parquet/tests/encryption/encryption_util.rs +++ b/parquet/tests/encryption/encryption_util.rs @@ -28,7 +28,7 @@ use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use std::collections::HashMap; use std::fs::File; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; pub(crate) fn verify_encryption_double_test_data( record_batches: Vec, @@ -217,10 +217,10 @@ pub(crate) fn verify_column_indexes(metadata: &ParquetMetaData) { pub(crate) fn read_encrypted_file( file: &File, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) -> std::result::Result<(Vec, ArrowReaderMetadata), ParquetError> { - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); let metadata = ArrowReaderMetadata::load(file, options.clone())?; let builder = @@ -232,11 +232,12 @@ pub(crate) fn read_encrypted_file( pub(crate) fn read_and_roundtrip_to_encrypted_file( file: &File, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, encryption_properties: FileEncryptionProperties, ) { // read example data - let (batches, metadata) = read_encrypted_file(file, decryption_properties.clone()).unwrap(); + let (batches, metadata) = + read_encrypted_file(file, Arc::clone(&decryption_properties)).unwrap(); // write example data to a temporary file let temp_file = tempfile::tempfile().unwrap(); @@ -262,7 +263,7 @@ pub(crate) fn read_and_roundtrip_to_encrypted_file( pub(crate) fn verify_encryption_test_file_read( file: File, - decryption_properties: FileDecryptionProperties, + decryption_properties: Arc, ) { let options = ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);