Skip to content

Support multi-threaded writing of Parquet files with modular encryption #7818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub struct ArrowWriter<W: Write> {
arrow_schema: SchemaRef,

/// Creates new [`ArrowRowGroupWriter`] instances as required
row_group_writer_factory: ArrowRowGroupWriterFactory,
pub row_group_writer_factory: ArrowRowGroupWriterFactory,

/// The length of arrays to write to each row group
max_row_group_size: usize,
Expand Down Expand Up @@ -755,8 +755,9 @@ impl ArrowColumnWriter {
}

/// Encodes [`RecordBatch`] to a parquet row group
struct ArrowRowGroupWriter {
writers: Vec<ArrowColumnWriter>,
pub struct ArrowRowGroupWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't use the existing low level APIs documented here: https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowColumnWriter.html#example-encoding-two-arrow-arrays-in-parallel

In other words, do we really need to make what has previously been an implementation detail part of the public API?

Copy link
Member Author

@rok rok Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick response @alamb ! :)
Looking at the example - we have to pass a single private FileEncryptor object (it has random state) to all ArrowColumnWriters, SerializedRowGroupWriters and SerializedFileWriter. Perhaps we could hide everything into ArrowWriter. I'll have to do a quick check. If you have a cleaner idea we'd be most interested!

Copy link
Contributor

@alamb alamb Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think avoiding exposing ArrowRowGroupWriter and the associated machinery that would be good

The rationale is that @XiangpengHao and @zhuqi-lucas and myself are likely to be reworking it in the next few releases and once it is part of the public API changing it becomes harder as it requires more coordination / backwards compatibility concerns

/// [`ArrowColumnWriter`] for each column in a row group
pub writers: Vec<ArrowColumnWriter>,
schema: SchemaRef,
buffered_rows: usize,
}
Expand Down Expand Up @@ -789,44 +790,54 @@ impl ArrowRowGroupWriter {
}
}

struct ArrowRowGroupWriterFactory {
/// Factory for creating [`ArrowRowGroupWriter`] instances.
/// This is used by [`ArrowWriter`] to create row group writers, but can be used
/// directly for lower level API.
pub struct ArrowRowGroupWriterFactory {
#[cfg(feature = "encryption")]
file_encryptor: Option<Arc<FileEncryptor>>,
}

impl ArrowRowGroupWriterFactory {
/// Creates a new [`ArrowRowGroupWriterFactory`] using provided [`SerializedFileWriter`].
#[cfg(feature = "encryption")]
fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
pub fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
Self {
file_encryptor: file_writer.file_encryptor(),
}
}

#[cfg(not(feature = "encryption"))]
fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
pub fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
Self {}
}

/// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and writer properties.
#[cfg(feature = "encryption")]
fn create_row_group_writer(
pub fn create_row_group_writer(
&self,
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
row_group_index: usize,
) -> Result<ArrowRowGroupWriter> {
let writers = get_column_writers_with_encryptor(
parquet,
props,
arrow,
self.file_encryptor.clone(),
row_group_index,
)?;
let mut writers = Vec::with_capacity(arrow.fields.len());
let mut leaves = parquet.columns().iter();
let column_factory = ArrowColumnWriterFactory::new()
.with_file_encryptor(row_group_index, self.file_encryptor.clone());
for field in &arrow.fields {
column_factory.get_arrow_column_writer(
field.data_type(),
props,
&mut leaves,
&mut writers,
)?;
}
Ok(ArrowRowGroupWriter::new(writers, arrow))
}

#[cfg(not(feature = "encryption"))]
fn create_row_group_writer(
pub fn create_row_group_writer(
&self,
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
Expand Down Expand Up @@ -860,7 +871,7 @@ pub fn get_column_writers(

/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
#[cfg(feature = "encryption")]
fn get_column_writers_with_encryptor(
pub fn get_column_writers_with_encryptor(
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
Expand All @@ -883,14 +894,21 @@ fn get_column_writers_with_encryptor(
}

/// Gets [`ArrowColumnWriter`] instances for different data types
struct ArrowColumnWriterFactory {
pub struct ArrowColumnWriterFactory {
#[cfg(feature = "encryption")]
row_group_index: usize,
#[cfg(feature = "encryption")]
file_encryptor: Option<Arc<FileEncryptor>>,
}

impl Default for ArrowColumnWriterFactory {
fn default() -> Self {
Self::new()
}
}

impl ArrowColumnWriterFactory {
/// Create a new [`ArrowColumnWriterFactory`]
pub fn new() -> Self {
Self {
#[cfg(feature = "encryption")]
Expand All @@ -901,7 +919,7 @@ impl ArrowColumnWriterFactory {
}

#[cfg(feature = "encryption")]
pub fn with_file_encryptor(
fn with_file_encryptor(
mut self,
row_group_index: usize,
file_encryptor: Option<Arc<FileEncryptor>>,
Expand Down Expand Up @@ -939,7 +957,7 @@ impl ArrowColumnWriterFactory {
}

/// Gets the [`ArrowColumnWriter`] for the given `data_type`
fn get_arrow_column_writer(
pub fn get_arrow_column_writer(
&self,
data_type: &ArrowDataType,
props: &WriterPropertiesPtr,
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/encryption/encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ impl EncryptionPropertiesBuilder {

#[derive(Debug)]
/// The encryption configuration for a single Parquet file
pub(crate) struct FileEncryptor {
pub struct FileEncryptor {
properties: FileEncryptionProperties,
aad_file_unique: Vec<u8>,
file_aad: Vec<u8>,
}

impl FileEncryptor {
pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> {
pub fn new(properties: FileEncryptionProperties) -> Result<Self> {
// Generate unique AAD for file
let rng = SystemRandom::new();
let mut aad_file_unique = vec![0u8; 8];
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ pub struct WriterPropertiesBuilder {

impl WriterPropertiesBuilder {
/// Returns default state of the builder.
fn with_defaults() -> Self {
pub fn with_defaults() -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to make this pub, you can call WriterProperties::builder instead. I think it's best to keep this private rather than exposing two ways to do the same thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or impl Default for WriterPropertiesBuilder?

Self {
data_page_size_limit: DEFAULT_PAGE_SIZE,
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {

/// Get the file encryptor used by this instance to encrypt data
#[cfg(feature = "encryption")]
pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
pub fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
self.file_encryptor.clone()
}
}
Expand Down
132 changes: 124 additions & 8 deletions parquet/tests/encryption/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection,
RowSelector,
};
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_writer::{
compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory,
};
use parquet::arrow::{ArrowSchemaConverter, ArrowWriter};
use parquet::data_type::{ByteArray, ByteArrayType};
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::encryption::encrypt::FileEncryptionProperties;
use parquet::errors::ParquetError;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::file::writer::SerializedFileWriter;
use parquet::schema::parser::parse_message_type;
use std::fs::File;
Expand Down Expand Up @@ -1062,14 +1065,10 @@ fn test_decrypt_page_index(
Ok(())
}

fn read_and_roundtrip_to_encrypted_file(
fn read_encrypted_file(
path: &str,
decryption_properties: FileDecryptionProperties,
encryption_properties: FileEncryptionProperties,
) {
let temp_file = tempfile::tempfile().unwrap();

// read example data
) -> Result<(Vec<RecordBatch>, ArrowReaderMetadata), ParquetError> {
let file = File::open(path).unwrap();
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
Expand All @@ -1080,7 +1079,18 @@ fn read_and_roundtrip_to_encrypted_file(
let batches = batch_reader
.collect::<parquet::errors::Result<Vec<RecordBatch>, _>>()
.unwrap();
Ok((batches, metadata))
}

fn read_and_roundtrip_to_encrypted_file(
path: &str,
decryption_properties: FileDecryptionProperties,
encryption_properties: FileEncryptionProperties,
) {
// read example data
let (batches, metadata) = read_encrypted_file(path, decryption_properties.clone()).unwrap();

let temp_file = tempfile::tempfile().unwrap();
// write example data
let props = WriterProperties::builder()
.with_file_encryption_properties(encryption_properties)
Expand All @@ -1101,3 +1111,109 @@ fn read_and_roundtrip_to_encrypted_file(
// check re-written example data
verify_encryption_test_file_read(temp_file, decryption_properties);
}

#[tokio::test]
async fn test_multi_threaded_encrypted_writing() {
// Read example data and set up encryption/decryption properties
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");

let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into())
.with_column_key("double_field", b"1234567890123450".into())
.with_column_key("float_field", b"1234567890123451".into())
.build()
.unwrap();
let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into())
.with_column_key("double_field", b"1234567890123450".into())
.with_column_key("float_field", b"1234567890123451".into())
.build()
.unwrap();

let (record_batches, metadata) =
read_encrypted_file(&path, decryption_properties.clone()).unwrap();
let to_write: Vec<_> = record_batches
.iter()
.flat_map(|rb| rb.columns().to_vec())
.collect();
let schema = metadata.schema().clone();

let props = Arc::new(
WriterPropertiesBuilder::with_defaults()
.with_file_encryption_properties(file_encryption_properties)
.build(),
);
let parquet_schema = ArrowSchemaConverter::new()
.with_coerce_types(props.coerce_types())
.convert(&schema)
.unwrap();
let root_schema = parquet_schema.root_schema_ptr();

// Create a temporary file to write the encrypted data
let temp_file = tempfile::NamedTempFile::new().unwrap();
let mut file_writer =
SerializedFileWriter::new(&temp_file, root_schema.clone(), props.clone()).unwrap();

let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
let arrow_row_group_writer = arrow_row_group_writer_factory
.create_row_group_writer(&parquet_schema, &props.clone(), &schema, 0)
.unwrap();

// Get column writers with encryptor from ArrowRowGroupWriter
let col_writers = arrow_row_group_writer.writers;

let mut workers: Vec<_> = col_writers
.into_iter()
.map(|mut col_writer| {
let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
let handle = std::thread::spawn(move || {
// receive Arrays to encode via the channel
for col in recv {
col_writer.write(&col)?;
}
// once the input is complete, close the writer
// to return the newly created ArrowColumnChunk
col_writer.close()
});
(handle, send)
})
.collect();

let mut worker_iter = workers.iter_mut();
for (arr, field) in to_write.iter().zip(&schema.fields) {
for leaves in compute_leaves(field, arr).unwrap() {
worker_iter.next().unwrap().1.send(leaves).unwrap();
}
}

// Wait for the workers to complete encoding, and append
// the resulting column chunks to the row group (and the file)
let mut row_group_writer = file_writer.next_row_group().unwrap();

for (handle, send) in workers {
drop(send); // Drop send side to signal termination
// wait for the worker to send the completed chunk
let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
chunk.append_to_row_group(&mut row_group_writer).unwrap();
}
// Close the row group which writes to the underlying file
row_group_writer.close().unwrap();

// Close the file writer which writes the footer
let metadata = file_writer.close().unwrap();
assert_eq!(metadata.num_rows, 50);

// Check that the file was written correctly
let (read_record_batches, read_metadata) = read_encrypted_file(
temp_file.path().to_str().unwrap(),
decryption_properties.clone(),
)
.unwrap();
verify_encryption_test_data(read_record_batches, read_metadata.metadata());

// Check that file was encrypted
let result = ArrowReaderMetadata::load(&temp_file.into_file(), ArrowReaderOptions::default());
assert_eq!(
result.unwrap_err().to_string(),
"Parquet error: Parquet file has an encrypted footer but decryption properties were not provided"
);
}
26 changes: 13 additions & 13 deletions parquet/tests/encryption/encryption_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::encryption_util::{
verify_column_indexes, verify_encryption_test_data, TestKeyRetriever,
};
use arrow_array::RecordBatch;
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::arrow_writer::ArrowWriterOptions;
Expand Down Expand Up @@ -436,23 +437,29 @@ async fn test_decrypt_page_index(
Ok(())
}

async fn verify_encryption_test_file_read_async(
async fn read_encrypted_file_async(
file: &mut tokio::fs::File,
decryption_properties: FileDecryptionProperties,
) -> Result<(), ParquetError> {
) -> Result<(Vec<RecordBatch>, ArrowReaderMetadata), ParquetError> {
let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);

let arrow_metadata = ArrowReaderMetadata::load_async(file, options).await?;
let metadata = arrow_metadata.metadata();

let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata(
file.try_clone().await?,
arrow_metadata.clone(),
)
.build()?;
let record_batches = record_reader.try_collect::<Vec<_>>().await?;
Ok((record_batches, arrow_metadata.clone()))
}

verify_encryption_test_data(record_batches, metadata);
async fn verify_encryption_test_file_read_async(
file: &mut tokio::fs::File,
decryption_properties: FileDecryptionProperties,
) -> Result<(), ParquetError> {
let (record_batches, metadata) = read_encrypted_file_async(file, decryption_properties).await?;
verify_encryption_test_data(record_batches, metadata.metadata());
Ok(())
}

Expand All @@ -464,15 +471,8 @@ async fn read_and_roundtrip_to_encrypted_file_async(
let temp_file = tempfile::tempfile().unwrap();
let mut file = File::open(&path).await.unwrap();

let options =
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties.clone());
let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?;
let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata(
file.try_clone().await?,
arrow_metadata.clone(),
)
.build()?;
let record_batches = record_reader.try_collect::<Vec<_>>().await?;
let (record_batches, arrow_metadata) =
read_encrypted_file_async(&mut file, decryption_properties.clone()).await?;

let props = WriterProperties::builder()
.with_file_encryption_properties(encryption_properties)
Expand Down
Loading