-
Notifications
You must be signed in to change notification settings - Fork 982
Support multi-threaded writing of Parquet files with modular encryption #7818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
c2ac2d9
68275fa
2d42ca0
b57a5d4
b9396cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,7 +141,7 @@ pub struct ArrowWriter<W: Write> { | |
arrow_schema: SchemaRef, | ||
|
||
/// Creates new [`ArrowRowGroupWriter`] instances as required | ||
row_group_writer_factory: ArrowRowGroupWriterFactory, | ||
pub row_group_writer_factory: ArrowRowGroupWriterFactory, | ||
|
||
/// The length of arrays to write to each row group | ||
max_row_group_size: usize, | ||
|
@@ -755,8 +755,9 @@ impl ArrowColumnWriter { | |
} | ||
|
||
/// Encodes [`RecordBatch`] to a parquet row group | ||
struct ArrowRowGroupWriter { | ||
writers: Vec<ArrowColumnWriter>, | ||
pub struct ArrowRowGroupWriter { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason we can't use the existing low level APIs documented here: https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowColumnWriter.html#example-encoding-two-arrow-arrays-in-parallel In other words, do we really need to make what has previously been an implementation detail part of the public API?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the quick response @alamb ! :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do think avoiding exposing ArrowRowGroupWriter and the associated machinery that would be good The rationale is that @XiangpengHao and @zhuqi-lucas and myself are likely to be reworking it in the next few releases and once it is part of the public API changing it becomes harder as it requires more coordination / backwards compatibility concerns |
||
/// [`ArrowColumnWriter`] for each column in a row group | ||
pub writers: Vec<ArrowColumnWriter>, | ||
rok marked this conversation as resolved.
Show resolved
Hide resolved
|
||
schema: SchemaRef, | ||
buffered_rows: usize, | ||
} | ||
|
@@ -789,44 +790,54 @@ impl ArrowRowGroupWriter { | |
} | ||
} | ||
|
||
struct ArrowRowGroupWriterFactory { | ||
/// Factory for creating [`ArrowRowGroupWriter`] instances. | ||
/// This is used by [`ArrowWriter`] to create row group writers, but can be used | ||
/// directly for lower level API. | ||
pub struct ArrowRowGroupWriterFactory { | ||
#[cfg(feature = "encryption")] | ||
file_encryptor: Option<Arc<FileEncryptor>>, | ||
} | ||
|
||
impl ArrowRowGroupWriterFactory { | ||
/// Creates a new [`ArrowRowGroupWriterFactory`] using provided [`SerializedFileWriter`]. | ||
#[cfg(feature = "encryption")] | ||
fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self { | ||
pub fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self { | ||
Self { | ||
file_encryptor: file_writer.file_encryptor(), | ||
} | ||
} | ||
|
||
#[cfg(not(feature = "encryption"))] | ||
fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self { | ||
pub fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self { | ||
Self {} | ||
} | ||
|
||
/// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and writer properties. | ||
#[cfg(feature = "encryption")] | ||
fn create_row_group_writer( | ||
pub fn create_row_group_writer( | ||
rok marked this conversation as resolved.
Show resolved
Hide resolved
|
||
&self, | ||
parquet: &SchemaDescriptor, | ||
props: &WriterPropertiesPtr, | ||
arrow: &SchemaRef, | ||
row_group_index: usize, | ||
) -> Result<ArrowRowGroupWriter> { | ||
let writers = get_column_writers_with_encryptor( | ||
parquet, | ||
props, | ||
arrow, | ||
self.file_encryptor.clone(), | ||
row_group_index, | ||
)?; | ||
let mut writers = Vec::with_capacity(arrow.fields.len()); | ||
rok marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut leaves = parquet.columns().iter(); | ||
let column_factory = ArrowColumnWriterFactory::new() | ||
.with_file_encryptor(row_group_index, self.file_encryptor.clone()); | ||
for field in &arrow.fields { | ||
column_factory.get_arrow_column_writer( | ||
field.data_type(), | ||
props, | ||
&mut leaves, | ||
&mut writers, | ||
)?; | ||
} | ||
Ok(ArrowRowGroupWriter::new(writers, arrow)) | ||
} | ||
|
||
#[cfg(not(feature = "encryption"))] | ||
fn create_row_group_writer( | ||
pub fn create_row_group_writer( | ||
&self, | ||
parquet: &SchemaDescriptor, | ||
props: &WriterPropertiesPtr, | ||
|
@@ -860,7 +871,7 @@ pub fn get_column_writers( | |
|
||
/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption | ||
#[cfg(feature = "encryption")] | ||
fn get_column_writers_with_encryptor( | ||
pub fn get_column_writers_with_encryptor( | ||
rok marked this conversation as resolved.
Show resolved
Hide resolved
|
||
parquet: &SchemaDescriptor, | ||
props: &WriterPropertiesPtr, | ||
arrow: &SchemaRef, | ||
|
@@ -883,14 +894,21 @@ fn get_column_writers_with_encryptor( | |
} | ||
|
||
/// Gets [`ArrowColumnWriter`] instances for different data types | ||
struct ArrowColumnWriterFactory { | ||
pub struct ArrowColumnWriterFactory { | ||
rok marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#[cfg(feature = "encryption")] | ||
row_group_index: usize, | ||
#[cfg(feature = "encryption")] | ||
file_encryptor: Option<Arc<FileEncryptor>>, | ||
} | ||
|
||
impl Default for ArrowColumnWriterFactory { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl ArrowColumnWriterFactory { | ||
/// Create a new [`ArrowColumnWriterFactory`] | ||
pub fn new() -> Self { | ||
Self { | ||
#[cfg(feature = "encryption")] | ||
|
@@ -901,7 +919,7 @@ impl ArrowColumnWriterFactory { | |
} | ||
|
||
#[cfg(feature = "encryption")] | ||
pub fn with_file_encryptor( | ||
fn with_file_encryptor( | ||
mut self, | ||
row_group_index: usize, | ||
file_encryptor: Option<Arc<FileEncryptor>>, | ||
|
@@ -939,7 +957,7 @@ impl ArrowColumnWriterFactory { | |
} | ||
|
||
/// Gets the [`ArrowColumnWriter`] for the given `data_type` | ||
fn get_arrow_column_writer( | ||
pub fn get_arrow_column_writer( | ||
&self, | ||
data_type: &ArrowDataType, | ||
props: &WriterPropertiesPtr, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -457,7 +457,7 @@ pub struct WriterPropertiesBuilder { | |
|
||
impl WriterPropertiesBuilder { | ||
/// Returns default state of the builder. | ||
fn with_defaults() -> Self { | ||
pub fn with_defaults() -> Self { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need to make this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or |
||
Self { | ||
data_page_size_limit: DEFAULT_PAGE_SIZE, | ||
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, | ||
|
Uh oh!
There was an error while loading. Please reload this page.