Skip to content
This repository was archived by the owner on Dec 29, 2021. It is now read-only.

Commit c35ff3e

Browse files
committed
initial impl of lazy evaluation
ref #11
1 parent f48ce45 commit c35ff3e

File tree

5 files changed

+301
-1
lines changed

5 files changed

+301
-1
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,7 @@ byteorder = "1"
1414
flatbuffers = "0.5"
1515
array_tool = "1"
1616
postgres = {version = "0.16.0-rc.1", features = ["default", "with-chrono-0_4", "with-uuid-0_7"]}
17-
chrono = "0.4"
17+
chrono = "0.4"
18+
# for lazy evaluation
19+
serde = { version = "1.0", features = ["derive"] }
20+
serde_json = { version = "1.0", features = ["preserve_order"] }

src/evaluation.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
//!
2+
//! Lazy evaluation for DataFrames
3+
//!
4+
//! This is an experimental initial implementation
5+
6+
use crate::dataframe::DataFrame;
7+
use crate::expression::*;
8+
use crate::functions::scalar::ScalarFunctions as Scalar;
9+
use crate::table;
10+
use ::std::sync::Arc;
11+
use arrow::array::*;
12+
use arrow::datatypes::DataType;
13+
14+
pub trait Evaluate {
15+
/// Evaluate an operation against a data source
16+
fn evaluate(self, operation: &Operation) -> Self;
17+
}
18+
19+
impl Evaluate for DataFrame {
20+
fn evaluate(self, operation: &Operation) -> Self {
21+
// get the input columns from the dataframe
22+
let columns: Vec<&table::Column> = operation
23+
.inputs
24+
.clone()
25+
.into_iter()
26+
.map(|col: Column| self.column_by_name(&col.name))
27+
.collect();
28+
match &operation.expression {
29+
Expression::Scalar(expr) => match expr {
30+
ScalarExpression::Add => {
31+
// we are adding 2 columns together to create a third
32+
let column: Vec<ArrayRef> =
33+
if let ColumnType::Scalar(dtype) = &operation.output.column_type {
34+
match dtype {
35+
DataType::Int16 => Scalar::add(
36+
table::column_to_arrays_i16(columns.get(0).unwrap()),
37+
table::column_to_arrays_i16(columns.get(1).unwrap()),
38+
)
39+
.unwrap()
40+
.into_iter()
41+
.map(|arr| Arc::new(arr) as ArrayRef)
42+
.collect(),
43+
DataType::Float64 => Scalar::add(
44+
table::column_to_arrays_f64(columns.get(0).unwrap()),
45+
table::column_to_arrays_f64(columns.get(1).unwrap()),
46+
)
47+
.unwrap()
48+
.into_iter()
49+
.map(|arr| Arc::new(arr) as ArrayRef)
50+
.collect(),
51+
_ => panic!("Unsupported operation"),
52+
}
53+
} else {
54+
unreachable!()
55+
};
56+
self.with_column(
57+
&operation.output.name,
58+
table::Column::from_arrays(column, operation.output.clone().into()),
59+
)
60+
}
61+
_ => panic!("Scalar Expression {:?} not supported", expr),
62+
},
63+
// expr @ _ => panic!("Expression {:?} not supported", expr),
64+
}
65+
}
66+
}
67+
68+
#[cfg(test)]
69+
mod tests {
70+
use super::*;
71+
72+
use crate::operation::{AddOperation, ScalarOperation};
73+
74+
#[test]
75+
fn test_evaluation() {
76+
let dataframe = DataFrame::from_csv("./test/data/uk_cities_with_headers.csv", None);
77+
let a = Column {
78+
name: "lat".to_owned(),
79+
column_type: ColumnType::Scalar(DataType::Float64),
80+
};
81+
let b = Column {
82+
name: "lng".to_owned(),
83+
column_type: ColumnType::Scalar(DataType::Float64),
84+
};
85+
86+
let add = AddOperation::transform(vec![a, b], Some("lat_lng".to_owned())).unwrap();
87+
88+
let out_dataframe = dataframe.evaluate(&add);
89+
90+
assert_eq!(
91+
"Schema { fields: [Field { name: \"city\", data_type: Utf8, nullable: false }, Field { name: \"lat\", data_type: Float64, nullable: false }, Field { name: \"lng\", data_type: Float64, nullable: false }, Field { name: \"lat_lng\", data_type: Float64, nullable: true }] }",
92+
format!("{:?}", out_dataframe.schema())
93+
);
94+
}
95+
}

src/expression.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
//! Expressions that generate operations and computations
2+
3+
use arrow::datatypes::DataType;
4+
use serde::{Deserialize, Serialize};
5+
6+
#[derive(Serialize, Deserialize, Debug, Clone)]
7+
pub enum ColumnType {
8+
Array(DataType),
9+
Scalar(DataType),
10+
}
11+
12+
impl From<DataType> for ColumnType {
13+
fn from(dtype: DataType) -> Self {
14+
match dtype {
15+
DataType::Struct(_) => panic!("struct array conversion not yet supported"),
16+
DataType::List(inner) => ColumnType::Array(*inner.clone()),
17+
_ => ColumnType::Scalar(dtype.clone()),
18+
}
19+
}
20+
}
21+
22+
impl From<ColumnType> for DataType {
23+
fn from(from: ColumnType) -> Self {
24+
match from {
25+
ColumnType::Array(dtype) => DataType::List(Box::new(dtype)),
26+
ColumnType::Scalar(dtype) => dtype.clone(),
27+
}
28+
}
29+
}
30+
31+
#[derive(Serialize, Deserialize, Debug, Clone)]
32+
pub struct Column {
33+
pub(crate) name: String,
34+
pub(crate) column_type: ColumnType,
35+
}
36+
37+
impl From<arrow::datatypes::Field> for Column {
38+
fn from(field: arrow::datatypes::Field) -> Self {
39+
Column {
40+
name: field.name().clone(),
41+
column_type: field.data_type().clone().into(),
42+
}
43+
}
44+
}
45+
46+
impl From<Column> for arrow::datatypes::Field {
47+
fn from(column: Column) -> Self {
48+
arrow::datatypes::Field::new(column.name.as_str(), column.column_type.into(), true)
49+
}
50+
}
51+
52+
#[derive(Serialize, Deserialize, Debug, Clone)]
53+
pub struct Frame {
54+
pub(crate) name: String,
55+
pub(crate) columns: Vec<Column>,
56+
}
57+
58+
#[derive(Serialize, Deserialize, Debug, Clone)]
59+
pub enum Transformation {
60+
Aggregate,
61+
Calculate(Operation),
62+
Join,
63+
Group,
64+
// can add other transform types here
65+
}
66+
67+
#[derive(Serialize, Deserialize, Debug, Clone)]
68+
pub struct Operation {
69+
pub(crate) name: String,
70+
pub(crate) inputs: Vec<Column>,
71+
pub(crate) output: Column,
72+
pub(crate) expression: Expression,
73+
}
74+
75+
#[derive(Serialize, Deserialize, Debug, Clone)]
76+
pub struct Computation {
77+
pub(crate) input: Vec<Frame>,
78+
pub(crate) operations: Vec<Operation>,
79+
pub(crate) output: Frame,
80+
}
81+
82+
#[derive(Serialize, Deserialize, Debug, Clone)]
83+
pub enum Expression {
84+
Scalar(ScalarExpression),
85+
}
86+
87+
#[derive(Serialize, Deserialize, Debug, Clone)]
88+
pub enum ScalarExpression {
89+
Add,
90+
Subtract,
91+
Divide,
92+
Multiply,
93+
}
94+
95+
#[cfg(test)]
96+
mod tests {
97+
use super::*;
98+
99+
#[test]
100+
fn debug() {
101+
let frame = Frame {
102+
name: "Input Table 1".to_owned(),
103+
columns: vec![Column {
104+
name: "id".to_owned(),
105+
column_type: ColumnType::Scalar(DataType::Int64),
106+
}],
107+
};
108+
109+
assert_eq!("Frame { name: \"Input Table 1\", columns: [Column { name: \"id\", column_type: Scalar(Int64) }] }", format!("{:?}", frame));
110+
let as_json = serde_json::to_string(&frame).unwrap();
111+
assert_eq!("{\"name\":\"Input Table 1\",\"columns\":[{\"name\":\"id\",\"column_type\":{\"Scalar\":\"Int64\"}}]}", as_json);
112+
}
113+
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66

77
pub mod dataframe;
88
pub mod error;
9+
pub mod evaluation;
10+
pub mod expression;
911
pub mod functions;
1012
pub mod io;
13+
pub mod operation;
1114
pub mod table;
1215
pub mod utils;

src/operation.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
//! Operations
2+
3+
use crate::evaluation::*;
4+
use crate::expression::*;
5+
use arrow::datatypes::DataType;
6+
use arrow::error::ArrowError;
7+
8+
pub trait ScalarOperation {
9+
fn name() -> &'static str;
10+
fn transform(inputs: Vec<Column>, name: Option<String>) -> Result<Operation, ArrowError>;
11+
}
12+
13+
/// Operation to add two numeric columns together
14+
pub struct AddOperation;
15+
16+
impl ScalarOperation for AddOperation {
17+
fn name() -> &'static str {
18+
"add"
19+
}
20+
21+
fn transform(inputs: Vec<Column>, name: Option<String>) -> Result<Operation, ArrowError> {
22+
// add n columns together provided that they are of the same data type
23+
// for now we support 2 inputs at a time
24+
if inputs.len() != 2 {
25+
Err(ArrowError::ComputeError(
26+
"Add operation expects 2 inputs".to_string(),
27+
))
28+
} else {
29+
let a = &inputs[0];
30+
let b = &inputs[1];
31+
match (&a.column_type, &b.column_type) {
32+
(ColumnType::Array(_), _) | (_, ColumnType::Array(_)) => {
33+
Err(ArrowError::ComputeError(
34+
"Add operation only works on scalar columns".to_string(),
35+
))
36+
}
37+
(ColumnType::Scalar(from_type), ColumnType::Scalar(to_type)) => {
38+
if from_type != to_type {
39+
Err(ArrowError::ComputeError(
40+
"Add operation currently only works on the same data types".to_string(),
41+
))
42+
} else {
43+
Ok(Operation {
44+
name: Self::name().to_string(),
45+
inputs: inputs.clone(),
46+
output: Column {
47+
name: name.unwrap_or(format!(
48+
"{}({}, {})",
49+
Self::name(),
50+
&a.name,
51+
&b.name
52+
)),
53+
column_type: ColumnType::Scalar(from_type.clone()),
54+
},
55+
expression: Expression::Scalar(ScalarExpression::Add),
56+
})
57+
}
58+
}
59+
}
60+
}
61+
}
62+
}
63+
64+
#[cfg(test)]
65+
mod tests {
66+
use super::*;
67+
68+
#[test]
69+
fn scalar_operations() {
70+
let a = Column {
71+
name: "a".to_owned(),
72+
column_type: ColumnType::Scalar(DataType::Int64),
73+
};
74+
let b = Column {
75+
name: "b".to_owned(),
76+
column_type: ColumnType::Scalar(DataType::Int64),
77+
};
78+
79+
let add = AddOperation::transform(vec![a, b], None).unwrap();
80+
81+
assert_eq!(
82+
"Operation { name: \"add\", inputs: [Column { name: \"a\", column_type: Scalar(Int64) }, Column { name: \"b\", column_type: Scalar(Int64) }], output: Column { name: \"add(a, b)\", column_type: Scalar(Int64) }, expression: Scalar(Add) }",
83+
format!("{:?}", add)
84+
);
85+
}
86+
}

0 commit comments

Comments
 (0)