Skip to content
This repository was archived by the owner on Dec 29, 2021. It is now read-only.

Commit 02c565a

Browse files
authored
Merge pull request #12 from nevi-me/slices
Ability to slice table, column, chunked array
2 parents c3980af + a438661 commit 02c565a

File tree

6 files changed

+219
-203
lines changed

6 files changed

+219
-203
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ authors = ["Neville Dipale <nevilledips@gmail.com>"]
55
edition = "2018"
66

77
[dependencies]
8-
arrow = { git = "https://github.com/nevi-me/arrow", branch="arrow-4386"}
8+
arrow = { git = "https://github.com/apache/arrow"}
99
# arrow = { path = "../../arrow/rust/arrow"}
1010
num = "0.2"
1111
num-traits = "0.2"

README.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ To that end, we're trying to support CSV, JSON, and perhaps other simpler file f
3737
**Note on Feather:** The Feather file format support can be considered as deprecated in favour of Arrow IPC. Though we have implemented Feather, it's meant to be a stop-gap measure until Arrow supports IPC (in Rust). We'll try tackle this in the coming months.
3838

3939
- IO Support
40-
- [ ] CSV
40+
- [X] CSV (using Arrow)
4141
- [X] Read
42-
- [ ] Write
42+
- [X] Write
4343
- [ ] JSON
4444
- [X] Read (submitted to Arrow)
4545
- [ ] Write
@@ -50,13 +50,11 @@ To that end, we're trying to support CSV, JSON, and perhaps other simpler file f
5050
### Functionality
5151

5252
- DataFrame Operations
53-
<!-- - [x] Read CSV into dataframe -->
5453
- [X] Select single column
55-
- [ ] Select subset of columns, drop columns
54+
- [X] Select subset of columns, drop columns
5655
- [X] Add or remove columns
5756
- [X] Rename columns
5857
- [ ] Create dataframe from record batches (a `Vec<RecordBatch>` as well as an iterator)
59-
- [ ] Write dataframe to CSV (and other formats as and when Arrow supports them)
6058
- [ ] Sort dataframes
6159
- [ ] Grouped operations
6260
- [ ] Filter dataframes

src/dataframe.rs

Lines changed: 142 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -17,50 +17,11 @@ use std::sync::Arc;
1717

1818
use crate::error::DataFrameError;
1919

20-
//impl From<&ArrayRef> for &PrimitiveArray<BooleanType> {
21-
// fn from(array: &ArrayRef) -> Self {
22-
// array.as_any().downcast_ref::<BooleanArray>().unwrap()
23-
// }
24-
//}
25-
26-
//impl<T: ArrowPrimitiveType> From<&Array> for &PrimitiveArray<T> {
27-
// fn from(array: &Array) -> Self {
28-
// match array.data_type() {
29-
// DataType::Boolean => array.as_any().downcast_ref::<T>().unwrap()
30-
// }
31-
//// _ => unimplemented!("Casting array to other primitive types is not implemented")
32-
// }
33-
//}
34-
35-
//fn array_to_primitive<T>(array: &Array) -> &PrimitiveArray<T>
36-
// where
37-
// T: ArrowPrimitiveType,
38-
//{
39-
// match array.data_type() {
40-
// DataType::Boolean => {
41-
// array.as_any().downcast_ref::<BooleanArray>().unwrap()
42-
// }
43-
// _ => unimplemented!("Casting for other array types is not implemented")
44-
// }
45-
//}
46-
4720
pub struct DataFrame {
4821
schema: Arc<Schema>,
4922
columns: Vec<Column>,
5023
}
5124

52-
// struct CsvDataSource {
53-
// reader: CsvReader,
54-
// }
55-
56-
// impl Iterator for CsvDataSource {
57-
// type Item = Result<RecordBatch, DataFrameError>;
58-
59-
// fn next(&mut self) -> Result<Option<Self::Item>, arrow::error::ArrowError> {
60-
// Some(Ok(self.reader.next()))
61-
// }
62-
// }
63-
6425
impl DataFrame {
6526
/// Create an empty `DataFrame`
6627
fn empty() -> Self {
@@ -173,38 +134,24 @@ impl DataFrame {
173134

174135
arrays.into_iter().for_each(|array| {
175136
dbg!(array.len());
176-
batches.push(RecordBatch::new(self.schema.clone(), array));
137+
// the unwrap is infallible as we're passing data that's already been verified
138+
batches.push(RecordBatch::try_new(self.schema.clone(), array).unwrap());
177139
});
178140

179141
batches
180142
}
181143

182144
/// Returns dataframe with the first n records selected
183-
///
184-
/// TODO: this should work through batches, and slice the last one that makes
185-
/// the length match what we're taking.
186-
// fn take(&self, count: usize) -> Self {
187-
// DataFrame::new(
188-
// self.schema.clone(),
189-
// self.columns
190-
// .into_iter()
191-
// .map(|col| {
192-
// ArrayDataBuilder::new(col.data_type().clone())
193-
// .child_data(
194-
// col.data()
195-
// .child_data()
196-
// .iter()
197-
// .take(count)
198-
// .into_iter()
199-
// .map(|x| x.clone())
200-
// .collect(),
201-
// )
202-
// .build()
203-
// })
204-
// .map(|col| utils::make_array(col))
205-
// .collect(),
206-
// )
207-
// }
145+
fn take(&self, count: usize) -> Self {
146+
DataFrame::new(
147+
self.schema.clone(),
148+
self.columns
149+
.clone()
150+
.into_iter()
151+
.map(|col| col.slice(0, Some(count)))
152+
.collect(),
153+
)
154+
}
208155

209156
fn intersect(&self, other: &DataFrame) -> Self {
210157
unimplemented!("Intersect not yet implemented")
@@ -213,100 +160,96 @@ impl DataFrame {
213160
/// Returns dataframe with specified columns selected.
214161
///
215162
/// If a column name does not exist, it is omitted.
216-
// pub fn select(&mut self, col_names: Vec<&str>) -> Self {
217-
// // get the names of columns from the schema, and match them with supplied
218-
// let mut col_num: i16 = -1;
219-
// let schema = &self.schema.clone();
220-
// let field_names: Vec<(usize, &str)> = schema
221-
// .fields()
222-
// .iter()
223-
// .map(|c| {
224-
// col_num += 1;
225-
// (col_num as usize, c.name().as_str())
226-
// })
227-
// .collect();
228-
229-
// // filter names
230-
// let filter_cols: Vec<(usize, &str)> = if col_names.contains(&"*") {
231-
// field_names
232-
// } else {
233-
// // TODO follow the order of user-supplied column names
234-
// field_names
235-
// .into_iter()
236-
// .filter(|(col, name)| col_names.contains(name))
237-
// .collect()
238-
// };
239-
240-
// // let columns = filter_cols.clone().iter().map(move |c| self.columns[c.0]).collect();
241-
242-
// let mut columns = vec![];
243-
244-
// for (i,u) in filter_cols.clone() {
245-
// let c = &self.columns[i];
246-
// columns.push(c);
247-
// }
248-
249-
// let new_schema = Arc::new(Schema::new(
250-
// filter_cols
251-
// .iter()
252-
// .map(|c| schema.field(c.0).clone())
253-
// .collect(),
254-
// ));
255-
256-
// dbg!(filter_cols);
257-
258-
// DataFrame::from_columns(new_schema, columns)
259-
// }
163+
pub fn select(&mut self, col_names: Vec<&str>) -> Self {
164+
// get the names of columns from the schema, and match them with supplied
165+
let mut col_num: i16 = -1;
166+
let schema = &self.schema.clone();
167+
let field_names: Vec<(usize, &str)> = schema
168+
.fields()
169+
.iter()
170+
.map(|c| {
171+
col_num += 1;
172+
(col_num as usize, c.name().as_str())
173+
})
174+
.collect();
175+
176+
// filter names
177+
let filter_cols: Vec<(usize, &str)> = if col_names.contains(&"*") {
178+
field_names
179+
} else {
180+
// TODO follow the order of user-supplied column names
181+
field_names
182+
.into_iter()
183+
.filter(|(col, name)| col_names.contains(name))
184+
.collect()
185+
};
186+
187+
let mut columns = vec![];
188+
189+
for (i, u) in filter_cols.clone() {
190+
let c = &self.columns[i];
191+
columns.push(c.clone());
192+
}
193+
194+
let new_schema = Arc::new(Schema::new(
195+
filter_cols
196+
.iter()
197+
.map(|c| schema.field(c.0).clone())
198+
.collect(),
199+
));
200+
201+
DataFrame::from_columns(new_schema, columns)
202+
}
260203

261204
/// Returns a dataframe with specified columns dropped.
262205
///
263206
/// If a column name does not exist, it is omitted.
264-
// pub fn drop(&self, col_names: Vec<&str>) -> Self {
265-
// // get the names of columns from the schema, and match them with supplied
266-
// let mut col_num: i16 = -1;
267-
// let schema = self.schema.clone();
268-
// let field_names: Vec<(usize, &str)> = schema
269-
// .fields()
270-
// .into_iter()
271-
// .map(|c| {
272-
// col_num += 1;
273-
// (col_num as usize, c.name().as_str())
274-
// })
275-
// .collect();
276-
277-
// // filter names
278-
// let filter_cols: Vec<(usize, &str)> = {
279-
// // TODO follow the order of user-supplied column names
280-
// field_names
281-
// .into_iter()
282-
// .filter(|(col, name)| !col_names.contains(name))
283-
// .collect()
284-
// };
285-
286-
// // construct dataframe with selected columns
287-
// DataFrame {
288-
// schema: Arc::new(Schema::new(
289-
// filter_cols
290-
// .iter()
291-
// .map(|c| schema.field(c.0).clone())
292-
// .collect(),
293-
// )),
294-
// columns: filter_cols
295-
// .into_iter()
296-
// .map(move |c| self.columns[c.0])
297-
// .collect(),
298-
// }
299-
// }
207+
pub fn drop(&self, col_names: Vec<&str>) -> Self {
208+
// get the names of columns from the schema, and match them with supplied
209+
let mut col_num: i16 = -1;
210+
let schema = self.schema.clone();
211+
let field_names: Vec<(usize, &str)> = schema
212+
.fields()
213+
.into_iter()
214+
.map(|c| {
215+
col_num += 1;
216+
(col_num as usize, c.name().as_str())
217+
})
218+
.collect();
219+
220+
// filter names
221+
let filter_cols: Vec<(usize, &str)> = {
222+
// TODO follow the order of user-supplied column names
223+
field_names
224+
.into_iter()
225+
.filter(|(col, name)| !col_names.contains(name))
226+
.collect()
227+
};
228+
229+
// construct dataframe with selected columns
230+
DataFrame {
231+
schema: Arc::new(Schema::new(
232+
filter_cols
233+
.iter()
234+
.map(|c| schema.field(c.0).clone())
235+
.collect(),
236+
)),
237+
columns: filter_cols
238+
.iter()
239+
.map(move |c| self.columns[c.0].clone())
240+
.collect(),
241+
}
242+
}
300243

301244
/// Create a dataframe from an Arrow Table.
302245
///
303246
/// Arrow Tables are not yet in the Rust library, and we are hashing them out here
304-
// pub fn from_table(table: crate::table::Table) -> Self {
305-
// DataFrame {
306-
// schema: table.schema().clone(),
307-
// columns: *table.columns(),
308-
// }
309-
// }
247+
pub fn from_table(table: crate::table::Table) -> Self {
248+
DataFrame {
249+
schema: table.schema().clone(),
250+
columns: table.columns().to_vec(),
251+
}
252+
}
310253

311254
pub fn from_csv(path: &str, schema: Option<Arc<Schema>>) -> Self {
312255
let file = File::open(path).unwrap();
@@ -428,6 +371,21 @@ impl DataFrame {
428371
Ok(())
429372
}
430373

374+
pub fn to_csv(&self, path: &str) -> Result<(), arrow::error::ArrowError> {
375+
// use csv::error::Error;
376+
use arrow::csv::Writer;
377+
378+
let file = File::create(path)?;
379+
380+
let wrt = Writer::new(file);
381+
382+
let batches = self.to_record_batches();
383+
let batches_ref: Vec<&RecordBatch> = batches.iter().map(|b| b).collect();
384+
385+
wrt.write(batches_ref)?;
386+
387+
Ok(())
388+
}
431389
}
432390

433391
mod tests {
@@ -536,4 +494,37 @@ mod tests {
536494
let write = dataframe.to_feather("./test/data/uk_cities");
537495
assert!(write.is_ok());
538496
}
497+
498+
#[test]
499+
fn csv_io() {
500+
let mut dataframe = DataFrame::from_csv("./test/data/uk_cities_with_headers.csv", None);
501+
let a = dataframe.column_by_name("lat");
502+
let b = dataframe.column_by_name("lng");
503+
let sum = ScalarFunctions::add(column_to_arrays_f64(a), column_to_arrays_f64(b));
504+
// TODO, make this better
505+
let sum: Vec<ArrayRef> = sum
506+
.unwrap()
507+
.into_iter()
508+
.map(|p| Arc::new(p) as ArrayRef)
509+
.collect();
510+
dataframe = dataframe.with_column(
511+
"lat_lng_sum",
512+
Column::from_arrays(sum, Field::new("lat_lng_sum", DataType::Float64, true)),
513+
);
514+
515+
let city = dataframe.column_by_name("city");
516+
let lowercase = ScalarFunctions::lower(column_to_arrays_str(city));
517+
let lowercase: Vec<ArrayRef> = lowercase
518+
.unwrap()
519+
.into_iter()
520+
.map(|p| Arc::new(p) as ArrayRef)
521+
.collect();
522+
dataframe = dataframe.with_column(
523+
"city_lower",
524+
Column::from_arrays(lowercase, Field::new("city_lower", DataType::Utf8, true)),
525+
);
526+
527+
let write = dataframe.to_csv("/tmp/uk_cities_out.csv");
528+
assert!(write.is_ok());
529+
}
539530
}

src/functions/scalar.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,16 @@ impl ScalarFunctions {
247247
pub fn greatest() {}
248248
pub fn hash() {}
249249
pub fn hex() {}
250-
pub fn hour() {}
250+
pub fn hour<T>(array: Vec<&PrimitiveArray<T>>) -> Result<Vec<Int32Array>, ArrowError>
251+
where
252+
T: ArrowNumericType + ArrowTemporalType,
253+
i64: std::convert::From<T::Native>
254+
{
255+
array
256+
.iter()
257+
.map(|a| compute::hour(a))
258+
.collect()
259+
}
251260
pub fn hypot<T>(
252261
a: &PrimitiveArray<T>,
253262
b: &PrimitiveArray<T>,

0 commit comments

Comments
 (0)