Skip to content

Commit 2d42ca0

Browse files
committed
Move schema and writer properties into ArrowRowGroupWriterFactory
1 parent 68275fa commit 2d42ca0

File tree

4 files changed

+60
-49
lines changed

4 files changed

+60
-49
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ pub struct ArrowWriter<W: Write> {
141141
arrow_schema: SchemaRef,
142142

143143
/// Creates new [`ArrowRowGroupWriter`] instances as required
144-
pub row_group_writer_factory: ArrowRowGroupWriterFactory,
144+
row_group_writer_factory: ArrowRowGroupWriterFactory,
145145

146146
/// The length of arrays to write to each row group
147147
max_row_group_size: usize,
@@ -199,9 +199,14 @@ impl<W: Write + Send> ArrowWriter<W> {
199199
let max_row_group_size = props.max_row_group_size();
200200

201201
let file_writer =
202-
SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
202+
SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props.clone()))?;
203203

204-
let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
204+
let row_group_writer_factory = ArrowRowGroupWriterFactory::new(
205+
&file_writer,
206+
schema,
207+
arrow_schema.clone(),
208+
props.into(),
209+
);
205210

206211
Ok(Self {
207212
writer: file_writer,
@@ -272,12 +277,10 @@ impl<W: Write + Send> ArrowWriter<W> {
272277

273278
let in_progress = match &mut self.in_progress {
274279
Some(in_progress) => in_progress,
275-
x => x.insert(self.row_group_writer_factory.create_row_group_writer(
276-
self.writer.schema_descr(),
277-
self.writer.properties(),
278-
&self.arrow_schema,
279-
self.writer.flushed_row_groups().len(),
280-
)?),
280+
x => x.insert(
281+
self.row_group_writer_factory
282+
.create_row_group_writer(self.writer.flushed_row_groups().len())?,
283+
),
281284
};
282285

283286
// If would exceed max_row_group_size, split batch
@@ -794,58 +797,61 @@ impl ArrowRowGroupWriter {
794797
/// This is used by [`ArrowWriter`] to create row group writers, but can be used
795798
/// directly for lower level API.
796799
pub struct ArrowRowGroupWriterFactory {
800+
schema: SchemaDescriptor,
801+
arrow_schema: SchemaRef,
802+
props: WriterPropertiesPtr,
797803
#[cfg(feature = "encryption")]
798804
file_encryptor: Option<Arc<FileEncryptor>>,
799805
}
800806

801807
impl ArrowRowGroupWriterFactory {
802808
/// Creates a new [`ArrowRowGroupWriterFactory`] using provided [`SerializedFileWriter`].
803809
#[cfg(feature = "encryption")]
804-
pub fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
810+
pub fn new<W: Write + Send>(
811+
file_writer: &SerializedFileWriter<W>,
812+
schema: SchemaDescriptor,
813+
arrow_schema: SchemaRef,
814+
props: WriterPropertiesPtr,
815+
) -> Self {
805816
Self {
817+
schema,
818+
arrow_schema,
819+
props,
806820
file_encryptor: file_writer.file_encryptor(),
807821
}
808822
}
809823

810824
#[cfg(not(feature = "encryption"))]
811-
pub fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
812-
Self {}
825+
pub fn new<W: Write + Send>(
826+
_file_writer: &SerializedFileWriter<W>,
827+
schema: SchemaDescriptor,
828+
arrow_schema: SchemaRef,
829+
props: WriterPropertiesPtr,
830+
) -> Self {
831+
Self {
832+
schema,
833+
arrow_schema,
834+
props,
835+
}
813836
}
814837

815838
/// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and writer properties.
816839
#[cfg(feature = "encryption")]
817-
pub fn create_row_group_writer(
818-
&self,
819-
parquet: &SchemaDescriptor,
820-
props: &WriterPropertiesPtr,
821-
arrow: &SchemaRef,
822-
row_group_index: usize,
823-
) -> Result<ArrowRowGroupWriter> {
824-
let mut writers = Vec::with_capacity(arrow.fields.len());
825-
let mut leaves = parquet.columns().iter();
826-
let column_factory = ArrowColumnWriterFactory::new()
827-
.with_file_encryptor(row_group_index, self.file_encryptor.clone());
828-
for field in &arrow.fields {
829-
column_factory.get_arrow_column_writer(
830-
field.data_type(),
831-
props,
832-
&mut leaves,
833-
&mut writers,
834-
)?;
835-
}
836-
Ok(ArrowRowGroupWriter::new(writers, arrow))
840+
pub fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
841+
let writers = get_column_writers_with_encryptor(
842+
&self.schema,
843+
&self.props,
844+
&self.arrow_schema,
845+
self.file_encryptor.clone(),
846+
row_group_index,
847+
)?;
848+
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
837849
}
838850

839851
#[cfg(not(feature = "encryption"))]
840-
pub fn create_row_group_writer(
841-
&self,
842-
parquet: &SchemaDescriptor,
843-
props: &WriterPropertiesPtr,
844-
arrow: &SchemaRef,
845-
_row_group_index: usize,
846-
) -> Result<ArrowRowGroupWriter> {
847-
let writers = get_column_writers(parquet, props, arrow)?;
848-
Ok(ArrowRowGroupWriter::new(writers, arrow))
852+
pub fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
853+
let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
854+
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
849855
}
850856
}
851857

@@ -871,7 +877,7 @@ pub fn get_column_writers(
871877

872878
/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
873879
#[cfg(feature = "encryption")]
874-
pub fn get_column_writers_with_encryptor(
880+
fn get_column_writers_with_encryptor(
875881
parquet: &SchemaDescriptor,
876882
props: &WriterPropertiesPtr,
877883
arrow: &SchemaRef,
@@ -894,7 +900,7 @@ pub fn get_column_writers_with_encryptor(
894900
}
895901

896902
/// Gets [`ArrowColumnWriter`] instances for different data types
897-
pub struct ArrowColumnWriterFactory {
903+
struct ArrowColumnWriterFactory {
898904
#[cfg(feature = "encryption")]
899905
row_group_index: usize,
900906
#[cfg(feature = "encryption")]
@@ -919,7 +925,7 @@ impl ArrowColumnWriterFactory {
919925
}
920926

921927
#[cfg(feature = "encryption")]
922-
fn with_file_encryptor(
928+
pub fn with_file_encryptor(
923929
mut self,
924930
row_group_index: usize,
925931
file_encryptor: Option<Arc<FileEncryptor>>,

parquet/src/encryption/encrypt.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,14 +288,14 @@ impl EncryptionPropertiesBuilder {
288288

289289
#[derive(Debug)]
290290
/// The encryption configuration for a single Parquet file
291-
pub struct FileEncryptor {
291+
pub(crate) struct FileEncryptor {
292292
properties: FileEncryptionProperties,
293293
aad_file_unique: Vec<u8>,
294294
file_aad: Vec<u8>,
295295
}
296296

297297
impl FileEncryptor {
298-
pub fn new(properties: FileEncryptionProperties) -> Result<Self> {
298+
pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> {
299299
// Generate unique AAD for file
300300
let rng = SystemRandom::new();
301301
let mut aad_file_unique = vec![0u8; 8];

parquet/src/file/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
433433

434434
/// Get the file encryptor used by this instance to encrypt data
435435
#[cfg(feature = "encryption")]
436-
pub fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
436+
pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
437437
self.file_encryptor.clone()
438438
}
439439
}

parquet/tests/encryption/encryption.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,9 +1153,14 @@ async fn test_multi_threaded_encrypted_writing() {
11531153
let mut file_writer =
11541154
SerializedFileWriter::new(&temp_file, root_schema.clone(), props.clone()).unwrap();
11551155

1156-
let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
1156+
let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(
1157+
&file_writer,
1158+
parquet_schema,
1159+
schema.clone(),
1160+
props.clone(),
1161+
);
11571162
let arrow_row_group_writer = arrow_row_group_writer_factory
1158-
.create_row_group_writer(&parquet_schema, &props, &schema, 0)
1163+
.create_row_group_writer(0)
11591164
.unwrap();
11601165

11611166
// Get column writers with encryptor from ArrowRowGroupWriter

0 commit comments

Comments
 (0)