@@ -6,6 +6,7 @@ use arrow::array_data::ArrayDataRef;
6
6
use arrow:: csv:: Reader as CsvReader ;
7
7
use arrow:: csv:: ReaderBuilder as CsvReaderBuilder ;
8
8
use arrow:: datatypes:: * ;
9
+ use arrow:: error:: ArrowError ;
9
10
use arrow:: record_batch:: RecordBatch ;
10
11
use std:: fs:: File ;
11
12
use std:: sync:: Arc ;
@@ -112,6 +113,10 @@ impl DataFrame {
112
113
self . columns . len ( )
113
114
}
114
115
116
+ pub fn num_chunks ( & self ) -> usize {
117
+ self . column ( 0 ) . data . num_chunks ( )
118
+ }
119
+
115
120
pub fn num_rows ( & self ) -> usize {
116
121
self . columns [ 0 ] . data . num_rows ( )
117
122
}
@@ -155,12 +160,41 @@ impl DataFrame {
155
160
/// Returns dataframe as an Arrow `RecordBatch`
156
161
/// TODO: add a method to break into smaller batches
157
162
fn to_record_batches ( & self ) -> Vec < RecordBatch > {
158
- let batches: Vec < RecordBatch > = Vec :: with_capacity ( self . column ( 0 ) . data ( ) . num_chunks ( ) ) ;
159
- for i in 0 ..self . num_columns ( ) {
160
- unimplemented ! ( "We currently do not get batches, this should live in dataframe" )
163
+ let num_chunks = self . column ( 0 ) . data ( ) . num_chunks ( ) ;
164
+ let num_columns = self . num_columns ( ) ;
165
+ let mut batches: Vec < RecordBatch > = Vec :: with_capacity ( num_chunks) ;
166
+ let mut arrays: Vec < Vec < ArrayRef > > = Vec :: with_capacity ( num_chunks) ;
167
+ // for i in 0..self.num_columns() {
168
+ // let column = self.column(i);
169
+ // if i == 0 {
170
+ // arrays.push(vec![]);
171
+ // }
172
+ // for j in 0..column.data().num_chunks() {
173
+ // arrays[i].push(column.data().chunk(j).to_owned());
174
+ // }
175
+ // }
176
+
177
+ for i in 0 ..num_chunks {
178
+ let mut arr = vec ! [ ] ;
179
+
180
+ // if i == 0 {
181
+ // arrays.push(vec![]);
182
+ // }
183
+ for j in 0 ..num_columns {
184
+ let column = self . column ( j) ;
185
+ arr. push ( column. data ( ) . chunk ( i) . to_owned ( ) ) ;
186
+ }
187
+
188
+ arrays. push ( arr) ;
189
+ dbg ! ( "pushed array" ) ;
161
190
}
191
+
192
+ arrays. into_iter ( ) . for_each ( |array| {
193
+ dbg ! ( array. len( ) ) ;
194
+ batches. push ( RecordBatch :: new ( self . schema . clone ( ) , array) ) ;
195
+ } ) ;
196
+
162
197
batches
163
- // RecordBatch::new(self.schema.clone(), self.columns)
164
198
}
165
199
166
200
/// Returns dataframe with the first n records selected
@@ -300,7 +334,7 @@ impl DataFrame {
300
334
let builder = CsvReaderBuilder :: new ( )
301
335
. infer_schema ( None )
302
336
. has_headers ( true )
303
- . with_batch_size ( 6 ) ;
337
+ . with_batch_size ( 1024 ) ;
304
338
builder. build ( file) . unwrap ( )
305
339
}
306
340
} ;
0 commit comments