Skip to content

Commit cd5196f

Browse files
committed
Initial commit
1 parent 674dc17 commit cd5196f

File tree

6 files changed

+179
-44
lines changed

6 files changed

+179
-44
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ pub struct ArrowWriter<W: Write> {
141141
arrow_schema: SchemaRef,
142142

143143
/// Creates new [`ArrowRowGroupWriter`] instances as required
144-
row_group_writer_factory: ArrowRowGroupWriterFactory,
144+
pub row_group_writer_factory: ArrowRowGroupWriterFactory,
145145

146146
/// The length of arrays to write to each row group
147147
max_row_group_size: usize,
@@ -755,8 +755,9 @@ impl ArrowColumnWriter {
755755
}
756756

757757
/// Encodes [`RecordBatch`] to a parquet row group
758-
struct ArrowRowGroupWriter {
759-
writers: Vec<ArrowColumnWriter>,
758+
pub struct ArrowRowGroupWriter {
759+
/// [`ArrowColumnWriter`] for each column in a row group
760+
pub writers: Vec<ArrowColumnWriter>,
760761
schema: SchemaRef,
761762
buffered_rows: usize,
762763
}
@@ -789,44 +790,54 @@ impl ArrowRowGroupWriter {
789790
}
790791
}
791792

792-
struct ArrowRowGroupWriterFactory {
793+
/// Factory for creating [`ArrowRowGroupWriter`] instances.
794+
/// This is used by [`ArrowWriter`] to create row group writers, but can be used
795+
/// directly for lower level API.
796+
pub struct ArrowRowGroupWriterFactory {
793797
#[cfg(feature = "encryption")]
794798
file_encryptor: Option<Arc<FileEncryptor>>,
795799
}
796800

797801
impl ArrowRowGroupWriterFactory {
802+
/// Creates a new [`ArrowRowGroupWriterFactory`] using provided [`SerializedFileWriter`].
798803
#[cfg(feature = "encryption")]
799-
fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
804+
pub fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
800805
Self {
801806
file_encryptor: file_writer.file_encryptor(),
802807
}
803808
}
804809

805810
#[cfg(not(feature = "encryption"))]
806-
fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
811+
pub fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
807812
Self {}
808813
}
809814

815+
/// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and writer properties.
810816
#[cfg(feature = "encryption")]
811-
fn create_row_group_writer(
817+
pub fn create_row_group_writer(
812818
&self,
813819
parquet: &SchemaDescriptor,
814820
props: &WriterPropertiesPtr,
815821
arrow: &SchemaRef,
816822
row_group_index: usize,
817823
) -> Result<ArrowRowGroupWriter> {
818-
let writers = get_column_writers_with_encryptor(
819-
parquet,
820-
props,
821-
arrow,
822-
self.file_encryptor.clone(),
823-
row_group_index,
824-
)?;
824+
let mut writers = Vec::with_capacity(arrow.fields.len());
825+
let mut leaves = parquet.columns().iter();
826+
let column_factory = ArrowColumnWriterFactory::new()
827+
.with_file_encryptor(row_group_index, self.file_encryptor.clone());
828+
for field in &arrow.fields {
829+
column_factory.get_arrow_column_writer(
830+
field.data_type(),
831+
props,
832+
&mut leaves,
833+
&mut writers,
834+
)?;
835+
}
825836
Ok(ArrowRowGroupWriter::new(writers, arrow))
826837
}
827838

828839
#[cfg(not(feature = "encryption"))]
829-
fn create_row_group_writer(
840+
pub fn create_row_group_writer(
830841
&self,
831842
parquet: &SchemaDescriptor,
832843
props: &WriterPropertiesPtr,
@@ -860,7 +871,7 @@ pub fn get_column_writers(
860871

861872
/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
862873
#[cfg(feature = "encryption")]
863-
fn get_column_writers_with_encryptor(
874+
pub fn get_column_writers_with_encryptor(
864875
parquet: &SchemaDescriptor,
865876
props: &WriterPropertiesPtr,
866877
arrow: &SchemaRef,
@@ -883,14 +894,21 @@ fn get_column_writers_with_encryptor(
883894
}
884895

885896
/// Gets [`ArrowColumnWriter`] instances for different data types
886-
struct ArrowColumnWriterFactory {
897+
pub struct ArrowColumnWriterFactory {
887898
#[cfg(feature = "encryption")]
888899
row_group_index: usize,
889900
#[cfg(feature = "encryption")]
890901
file_encryptor: Option<Arc<FileEncryptor>>,
891902
}
892903

904+
impl Default for ArrowColumnWriterFactory {
905+
fn default() -> Self {
906+
Self::new()
907+
}
908+
}
909+
893910
impl ArrowColumnWriterFactory {
911+
/// Create a new [`ArrowColumnWriterFactory`]
894912
pub fn new() -> Self {
895913
Self {
896914
#[cfg(feature = "encryption")]
@@ -901,7 +919,7 @@ impl ArrowColumnWriterFactory {
901919
}
902920

903921
#[cfg(feature = "encryption")]
904-
pub fn with_file_encryptor(
922+
fn with_file_encryptor(
905923
mut self,
906924
row_group_index: usize,
907925
file_encryptor: Option<Arc<FileEncryptor>>,
@@ -939,7 +957,7 @@ impl ArrowColumnWriterFactory {
939957
}
940958

941959
/// Gets the [`ArrowColumnWriter`] for the given `data_type`
942-
fn get_arrow_column_writer(
960+
pub fn get_arrow_column_writer(
943961
&self,
944962
data_type: &ArrowDataType,
945963
props: &WriterPropertiesPtr,

parquet/src/encryption/encrypt.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,14 +288,14 @@ impl EncryptionPropertiesBuilder {
288288

289289
#[derive(Debug)]
290290
/// The encryption configuration for a single Parquet file
291-
pub(crate) struct FileEncryptor {
291+
pub struct FileEncryptor {
292292
properties: FileEncryptionProperties,
293293
aad_file_unique: Vec<u8>,
294294
file_aad: Vec<u8>,
295295
}
296296

297297
impl FileEncryptor {
298-
pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> {
298+
pub fn new(properties: FileEncryptionProperties) -> Result<Self> {
299299
// Generate unique AAD for file
300300
let rng = SystemRandom::new();
301301
let mut aad_file_unique = vec![0u8; 8];

parquet/src/file/properties.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ pub struct WriterPropertiesBuilder {
457457

458458
impl WriterPropertiesBuilder {
459459
/// Returns default state of the builder.
460-
fn with_defaults() -> Self {
460+
pub fn with_defaults() -> Self {
461461
Self {
462462
data_page_size_limit: DEFAULT_PAGE_SIZE,
463463
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,

parquet/src/file/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
433433

434434
/// Get the file encryptor used by this instance to encrypt data
435435
#[cfg(feature = "encryption")]
436-
pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
436+
pub fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
437437
self.file_encryptor.clone()
438438
}
439439
}

parquet/tests/encryption/encryption.rs

Lines changed: 125 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@ use parquet::arrow::arrow_reader::{
2828
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection,
2929
RowSelector,
3030
};
31-
use parquet::arrow::ArrowWriter;
31+
use parquet::arrow::arrow_writer::{
32+
compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory,
33+
};
34+
use parquet::arrow::{ArrowSchemaConverter, ArrowWriter};
3235
use parquet::data_type::{ByteArray, ByteArrayType};
3336
use parquet::encryption::decrypt::FileDecryptionProperties;
3437
use parquet::encryption::encrypt::FileEncryptionProperties;
3538
use parquet::errors::ParquetError;
3639
use parquet::file::metadata::ParquetMetaData;
37-
use parquet::file::properties::WriterProperties;
40+
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
3841
use parquet::file::writer::SerializedFileWriter;
3942
use parquet::schema::parser::parse_message_type;
4043
use std::fs::File;
@@ -1062,14 +1065,10 @@ fn test_decrypt_page_index(
10621065
Ok(())
10631066
}
10641067

1065-
fn read_and_roundtrip_to_encrypted_file(
1068+
fn read_encrypted_file(
10661069
path: &str,
10671070
decryption_properties: FileDecryptionProperties,
1068-
encryption_properties: FileEncryptionProperties,
1069-
) {
1070-
let temp_file = tempfile::tempfile().unwrap();
1071-
1072-
// read example data
1071+
) -> Result<(Vec<RecordBatch>, ArrowReaderMetadata), ParquetError> {
10731072
let file = File::open(path).unwrap();
10741073
let options = ArrowReaderOptions::default()
10751074
.with_file_decryption_properties(decryption_properties.clone());
@@ -1080,7 +1079,18 @@ fn read_and_roundtrip_to_encrypted_file(
10801079
let batches = batch_reader
10811080
.collect::<parquet::errors::Result<Vec<RecordBatch>, _>>()
10821081
.unwrap();
1082+
Ok((batches, metadata))
1083+
}
1084+
1085+
fn read_and_roundtrip_to_encrypted_file(
1086+
path: &str,
1087+
decryption_properties: FileDecryptionProperties,
1088+
encryption_properties: FileEncryptionProperties,
1089+
) {
1090+
// read example data
1091+
let (batches, metadata) = read_encrypted_file(path, decryption_properties.clone()).unwrap();
10831092

1093+
let temp_file = tempfile::tempfile().unwrap();
10841094
// write example data
10851095
let props = WriterProperties::builder()
10861096
.with_file_encryption_properties(encryption_properties)
@@ -1101,3 +1111,110 @@ fn read_and_roundtrip_to_encrypted_file(
11011111
// check re-written example data
11021112
verify_encryption_test_file_read(temp_file, decryption_properties);
11031113
}
1114+
1115+
#[tokio::test]
1116+
async fn test_multi_threaded_encrypted_writing() {
1117+
// Read example data and set up encryption/decryption properties
1118+
let testdata = arrow::util::test_util::parquet_test_data();
1119+
let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
1120+
1121+
let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into())
1122+
.with_column_key("double_field", b"1234567890123450".into())
1123+
.with_column_key("float_field", b"1234567890123451".into())
1124+
.build()
1125+
.unwrap();
1126+
let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into())
1127+
.with_column_key("double_field", b"1234567890123450".into())
1128+
.with_column_key("float_field", b"1234567890123451".into())
1129+
.build()
1130+
.unwrap();
1131+
1132+
let (record_batches, metadata) =
1133+
read_encrypted_file(&path, decryption_properties.clone()).unwrap();
1134+
let to_write: Vec<_> = record_batches
1135+
.iter()
1136+
.map(|rb| rb.columns().to_vec())
1137+
.flatten()
1138+
.collect();
1139+
let schema = metadata.schema().clone();
1140+
1141+
let props = Arc::new(
1142+
WriterPropertiesBuilder::with_defaults()
1143+
.with_file_encryption_properties(file_encryption_properties)
1144+
.build(),
1145+
);
1146+
let parquet_schema = ArrowSchemaConverter::new()
1147+
.with_coerce_types(props.coerce_types())
1148+
.convert(&schema)
1149+
.unwrap();
1150+
let root_schema = parquet_schema.root_schema_ptr();
1151+
1152+
// Create a temporary file to write the encrypted data
1153+
let temp_file = tempfile::NamedTempFile::new().unwrap();
1154+
let mut file_writer =
1155+
SerializedFileWriter::new(&temp_file, root_schema.clone(), props.clone()).unwrap();
1156+
1157+
let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
1158+
let arrow_row_group_writer = arrow_row_group_writer_factory
1159+
.create_row_group_writer(&parquet_schema, &props.clone(), &schema, 0)
1160+
.unwrap();
1161+
1162+
// Get column writers with encryptor from ArrowRowGroupWriter
1163+
let col_writers = arrow_row_group_writer.writers;
1164+
1165+
let mut workers: Vec<_> = col_writers
1166+
.into_iter()
1167+
.map(|mut col_writer| {
1168+
let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
1169+
let handle = std::thread::spawn(move || {
1170+
// receive Arrays to encode via the channel
1171+
for col in recv {
1172+
col_writer.write(&col)?;
1173+
}
1174+
// once the input is complete, close the writer
1175+
// to return the newly created ArrowColumnChunk
1176+
col_writer.close()
1177+
});
1178+
(handle, send)
1179+
})
1180+
.collect();
1181+
1182+
let mut worker_iter = workers.iter_mut();
1183+
for (arr, field) in to_write.iter().zip(&schema.fields) {
1184+
for leaves in compute_leaves(field, arr).unwrap() {
1185+
worker_iter.next().unwrap().1.send(leaves).unwrap();
1186+
}
1187+
}
1188+
1189+
// Wait for the workers to complete encoding, and append
1190+
// the resulting column chunks to the row group (and the file)
1191+
let mut row_group_writer = file_writer.next_row_group().unwrap();
1192+
1193+
for (handle, send) in workers {
1194+
drop(send); // Drop send side to signal termination
1195+
// wait for the worker to send the completed chunk
1196+
let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
1197+
chunk.append_to_row_group(&mut row_group_writer).unwrap();
1198+
}
1199+
// Close the row group which writes to the underlying file
1200+
row_group_writer.close().unwrap();
1201+
1202+
// Close the file writer which writes the footer
1203+
let metadata = file_writer.close().unwrap();
1204+
assert_eq!(metadata.num_rows, 50);
1205+
1206+
// Check that the file was written correctly
1207+
let (read_record_batches, read_metadata) = read_encrypted_file(
1208+
temp_file.path().to_str().unwrap(),
1209+
decryption_properties.clone(),
1210+
)
1211+
.unwrap();
1212+
verify_encryption_test_data(read_record_batches, read_metadata.metadata());
1213+
1214+
// Check that file was encrypted
1215+
let result = ArrowReaderMetadata::load(&temp_file.into_file(), ArrowReaderOptions::default());
1216+
assert_eq!(
1217+
result.unwrap_err().to_string(),
1218+
"Parquet error: Parquet file has an encrypted footer but decryption properties were not provided"
1219+
);
1220+
}

parquet/tests/encryption/encryption_async.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use crate::encryption_util::{
2121
verify_column_indexes, verify_encryption_test_data, TestKeyRetriever,
2222
};
23+
use arrow_array::RecordBatch;
2324
use futures::TryStreamExt;
2425
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
2526
use parquet::arrow::arrow_writer::ArrowWriterOptions;
@@ -436,23 +437,29 @@ async fn test_decrypt_page_index(
436437
Ok(())
437438
}
438439

439-
async fn verify_encryption_test_file_read_async(
440+
async fn read_encrypted_file_async(
440441
file: &mut tokio::fs::File,
441442
decryption_properties: FileDecryptionProperties,
442-
) -> Result<(), ParquetError> {
443+
) -> Result<(Vec<RecordBatch>, ArrowReaderMetadata), ParquetError> {
443444
let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
444445

445446
let arrow_metadata = ArrowReaderMetadata::load_async(file, options).await?;
446-
let metadata = arrow_metadata.metadata();
447447

448448
let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata(
449449
file.try_clone().await?,
450450
arrow_metadata.clone(),
451451
)
452452
.build()?;
453453
let record_batches = record_reader.try_collect::<Vec<_>>().await?;
454+
Ok((record_batches, arrow_metadata.clone()))
455+
}
454456

455-
verify_encryption_test_data(record_batches, metadata);
457+
async fn verify_encryption_test_file_read_async(
458+
file: &mut tokio::fs::File,
459+
decryption_properties: FileDecryptionProperties,
460+
) -> Result<(), ParquetError> {
461+
let (record_batches, metadata) = read_encrypted_file_async(file, decryption_properties).await?;
462+
verify_encryption_test_data(record_batches, &metadata.metadata());
456463
Ok(())
457464
}
458465

@@ -464,15 +471,8 @@ async fn read_and_roundtrip_to_encrypted_file_async(
464471
let temp_file = tempfile::tempfile().unwrap();
465472
let mut file = File::open(&path).await.unwrap();
466473

467-
let options =
468-
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties.clone());
469-
let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?;
470-
let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata(
471-
file.try_clone().await?,
472-
arrow_metadata.clone(),
473-
)
474-
.build()?;
475-
let record_batches = record_reader.try_collect::<Vec<_>>().await?;
474+
let (record_batches, arrow_metadata) =
475+
read_encrypted_file_async(&mut file, decryption_properties.clone()).await?;
476476

477477
let props = WriterProperties::builder()
478478
.with_file_encryption_properties(encryption_properties)

0 commit comments

Comments
 (0)