@@ -141,7 +141,7 @@ pub struct ArrowWriter<W: Write> {
141
141
arrow_schema : SchemaRef ,
142
142
143
143
/// Creates new [`ArrowRowGroupWriter`] instances as required
144
- pub row_group_writer_factory : ArrowRowGroupWriterFactory ,
144
+ row_group_writer_factory : ArrowRowGroupWriterFactory ,
145
145
146
146
/// The length of arrays to write to each row group
147
147
max_row_group_size : usize ,
@@ -199,9 +199,14 @@ impl<W: Write + Send> ArrowWriter<W> {
199
199
let max_row_group_size = props. max_row_group_size ( ) ;
200
200
201
201
let file_writer =
202
- SerializedFileWriter :: new ( writer, schema. root_schema_ptr ( ) , Arc :: new ( props) ) ?;
202
+ SerializedFileWriter :: new ( writer, schema. root_schema_ptr ( ) , Arc :: new ( props. clone ( ) ) ) ?;
203
203
204
- let row_group_writer_factory = ArrowRowGroupWriterFactory :: new ( & file_writer) ;
204
+ let row_group_writer_factory = ArrowRowGroupWriterFactory :: new (
205
+ & file_writer,
206
+ schema,
207
+ arrow_schema. clone ( ) ,
208
+ props. into ( ) ,
209
+ ) ;
205
210
206
211
Ok ( Self {
207
212
writer : file_writer,
@@ -272,12 +277,10 @@ impl<W: Write + Send> ArrowWriter<W> {
272
277
273
278
let in_progress = match & mut self . in_progress {
274
279
Some ( in_progress) => in_progress,
275
- x => x. insert ( self . row_group_writer_factory . create_row_group_writer (
276
- self . writer . schema_descr ( ) ,
277
- self . writer . properties ( ) ,
278
- & self . arrow_schema ,
279
- self . writer . flushed_row_groups ( ) . len ( ) ,
280
- ) ?) ,
280
+ x => x. insert (
281
+ self . row_group_writer_factory
282
+ . create_row_group_writer ( self . writer . flushed_row_groups ( ) . len ( ) ) ?,
283
+ ) ,
281
284
} ;
282
285
283
286
// If would exceed max_row_group_size, split batch
@@ -794,58 +797,61 @@ impl ArrowRowGroupWriter {
794
797
/// This is used by [`ArrowWriter`] to create row group writers, but can be used
795
798
/// directly for lower level API.
796
799
pub struct ArrowRowGroupWriterFactory {
800
+ schema : SchemaDescriptor ,
801
+ arrow_schema : SchemaRef ,
802
+ props : WriterPropertiesPtr ,
797
803
#[ cfg( feature = "encryption" ) ]
798
804
file_encryptor : Option < Arc < FileEncryptor > > ,
799
805
}
800
806
801
807
impl ArrowRowGroupWriterFactory {
802
808
/// Creates a new [`ArrowRowGroupWriterFactory`] using provided [`SerializedFileWriter`].
803
809
#[ cfg( feature = "encryption" ) ]
804
- pub fn new < W : Write + Send > ( file_writer : & SerializedFileWriter < W > ) -> Self {
810
+ pub fn new < W : Write + Send > (
811
+ file_writer : & SerializedFileWriter < W > ,
812
+ schema : SchemaDescriptor ,
813
+ arrow_schema : SchemaRef ,
814
+ props : WriterPropertiesPtr ,
815
+ ) -> Self {
805
816
Self {
817
+ schema,
818
+ arrow_schema,
819
+ props,
806
820
file_encryptor : file_writer. file_encryptor ( ) ,
807
821
}
808
822
}
809
823
810
824
#[ cfg( not( feature = "encryption" ) ) ]
811
- pub fn new < W : Write + Send > ( _file_writer : & SerializedFileWriter < W > ) -> Self {
812
- Self { }
825
+ pub fn new < W : Write + Send > (
826
+ _file_writer : & SerializedFileWriter < W > ,
827
+ schema : SchemaDescriptor ,
828
+ arrow_schema : SchemaRef ,
829
+ props : WriterPropertiesPtr ,
830
+ ) -> Self {
831
+ Self {
832
+ schema,
833
+ arrow_schema,
834
+ props,
835
+ }
813
836
}
814
837
815
838
/// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and writer properties.
816
839
#[ cfg( feature = "encryption" ) ]
817
- pub fn create_row_group_writer (
818
- & self ,
819
- parquet : & SchemaDescriptor ,
820
- props : & WriterPropertiesPtr ,
821
- arrow : & SchemaRef ,
822
- row_group_index : usize ,
823
- ) -> Result < ArrowRowGroupWriter > {
824
- let mut writers = Vec :: with_capacity ( arrow. fields . len ( ) ) ;
825
- let mut leaves = parquet. columns ( ) . iter ( ) ;
826
- let column_factory = ArrowColumnWriterFactory :: new ( )
827
- . with_file_encryptor ( row_group_index, self . file_encryptor . clone ( ) ) ;
828
- for field in & arrow. fields {
829
- column_factory. get_arrow_column_writer (
830
- field. data_type ( ) ,
831
- props,
832
- & mut leaves,
833
- & mut writers,
834
- ) ?;
835
- }
836
- Ok ( ArrowRowGroupWriter :: new ( writers, arrow) )
840
+ pub fn create_row_group_writer ( & self , row_group_index : usize ) -> Result < ArrowRowGroupWriter > {
841
+ let writers = get_column_writers_with_encryptor (
842
+ & self . schema ,
843
+ & self . props ,
844
+ & self . arrow_schema ,
845
+ self . file_encryptor . clone ( ) ,
846
+ row_group_index,
847
+ ) ?;
848
+ Ok ( ArrowRowGroupWriter :: new ( writers, & self . arrow_schema ) )
837
849
}
838
850
839
851
#[ cfg( not( feature = "encryption" ) ) ]
840
- pub fn create_row_group_writer (
841
- & self ,
842
- parquet : & SchemaDescriptor ,
843
- props : & WriterPropertiesPtr ,
844
- arrow : & SchemaRef ,
845
- _row_group_index : usize ,
846
- ) -> Result < ArrowRowGroupWriter > {
847
- let writers = get_column_writers ( parquet, props, arrow) ?;
848
- Ok ( ArrowRowGroupWriter :: new ( writers, arrow) )
852
+ pub fn create_row_group_writer ( & self , _row_group_index : usize ) -> Result < ArrowRowGroupWriter > {
853
+ let writers = get_column_writers ( & self . schema , & self . props , & self . arrow_schema ) ?;
854
+ Ok ( ArrowRowGroupWriter :: new ( writers, & self . arrow_schema ) )
849
855
}
850
856
}
851
857
@@ -871,7 +877,7 @@ pub fn get_column_writers(
871
877
872
878
/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
873
879
#[ cfg( feature = "encryption" ) ]
874
- pub fn get_column_writers_with_encryptor (
880
+ fn get_column_writers_with_encryptor (
875
881
parquet : & SchemaDescriptor ,
876
882
props : & WriterPropertiesPtr ,
877
883
arrow : & SchemaRef ,
@@ -894,7 +900,7 @@ pub fn get_column_writers_with_encryptor(
894
900
}
895
901
896
902
/// Gets [`ArrowColumnWriter`] instances for different data types
897
- pub struct ArrowColumnWriterFactory {
903
+ struct ArrowColumnWriterFactory {
898
904
#[ cfg( feature = "encryption" ) ]
899
905
row_group_index : usize ,
900
906
#[ cfg( feature = "encryption" ) ]
0 commit comments