Denester is a proof-of-concept implementation of the Dremel shredding algorithm for shredding nested data structures. It is a zero-dependency library with a lightweight type system capable of expressing nested data structures.
Shredding nested data structures using a library like arrow-rs can be complex, often requiring you to manually manage the state of the column array builders for every field in the schema. Denester eliminates this complexity by internally handling value traversal, schema validation, and a state machine for columnar shredding.
See a detailed API comparison with arrow-rs
- TODO: Link to blog on record shredding.
use denester::schema::{optional_string, repeated_group};
use denester::{Schema, SchemaBuilder};
fn contact_schema() -> Schema {
SchemaBuilder::new("Contact")
.field(optional_string("name"))
.field(repeated_group(
"phones",
vec![optional_string("number"), optional_string("phone_type")],
))
.build()
}
use denester::{Value, ValueBuilder};
fn contact_value() -> Value {
ValueBuilder::default()
.field("name", "Alice")
.repeated(
"phones",
vec![
ValueBuilder::default()
.field("number", "555-1234")
.field("phone_type", "Home")
.build(),
ValueBuilder::default()
.field("number", "555-5678")
.field("phone_type", "Work")
.build(),
],
)
.build()
}
use denester::ValueParser;
fn main() {
let schema = contact_schema();
let value = contact_value();
let iter = ValueParser::new(&schema, value.iter_depth_first());
println!("--- Shredding New Value ---");
for shredded_column_value in iter {
if let Ok(inner) = shredded_column_value {
println!("{inner}");
}
}
}
--- Shredding New Value ---
| String(Some("Alice")) | name | def=1 | rep=0 |
| String(Some("555-1234")) | phones.number | def=2 | rep=0 |
| String(Some("Home")) | phones.phone_type | def=2 | rep=0 |
| String(Some("555-5678")) | phones.number | def=2 | rep=1 |
| String(Some("Work")) | phones.phone_type | def=2 | rep=1 |
cargo build
cargo test
cargo bench --bench shredder
cargo run --example dremel
struct Contact {
name: Option<String>,
phones: Option<Vec<Phone>>,
}
struct Phone {
number: Option<String>,
phone_type: Option<PhoneType>,
}
enum PhoneType {
Home,
Work,
Mobile,
}
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use std::sync::Arc;
fn contact_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
// Contact.name
Arc::from(Field::new("name", DataType::Utf8, true)),
// Contact.phones
Arc::from(Field::new(
"phones",
DataType::List(Arc::new(
Field::new(
"item",
DataType::Struct(
vec![
// Contact.phones.number
Arc::from(Field::new("number", DataType::Utf8, true)),
// Contact.phones.phone_type
Arc::from(Field::new(
"phone_type",
DataType::Dictionary(
Box::new(DataType::UInt8),
Box::new(DataType::Utf8),
),
true,
)),
].into(),
),
true,
),
)),
true,
)),
]))
}
use arrow::array::{
ListBuilder, RecordBatch, StringBuilder, StringDictionaryBuilder, StructBuilder,
};
use arrow::datatypes::{SchemaRef, UInt8Type};
use std::error::Error;
use std::sync::Arc;
pub const PHONE_NUMBER_FIELD_INDEX: usize = 0;
pub const PHONE_TYPE_FIELD_INDEX: usize = 1;
pub fn contacts_to_record_batch(
schema: SchemaRef,
contacts: &[Contact],
) -> Result<RecordBatch, Box<dyn Error>> {
let mut name_builder = StringBuilder::new();
let phone_number_builder = StringBuilder::new();
let phone_type_builder = StringDictionaryBuilder::<UInt8Type>::new();
let phone_struct_builder = StructBuilder::new(
get_contact_phone_fields(),
vec![Box::new(phone_number_builder), Box::new(phone_type_builder)],
);
let mut phones_list_builder = ListBuilder::new(phone_struct_builder);
for contact in contacts {
name_builder.append_option(contact.name());
if let Some(phones) = contact.phones() {
let struct_builder = phones_list_builder.values();
for phone in phones {
struct_builder.append(true);
struct_builder
.field_builder::<StringBuilder>(PHONE_NUMBER_FIELD_INDEX)
.unwrap()
.append_option(phone.number());
struct_builder
.field_builder::<StringDictionaryBuilder<UInt8Type>>(PHONE_TYPE_FIELD_INDEX)
.unwrap()
.append_option(phone.phone_type().map(AsRef::as_ref));
}
phones_list_builder.append(true);
} else {
phones_list_builder.append_null();
}
}
let name_array = Arc::new(name_builder.finish());
let phones_array = Arc::new(phones_list_builder.finish());
RecordBatch::try_new(schema, vec![name_array, phones_array]).map_err(Into::into)
}
use denester::schema::{optional_string, repeated_group};
use denester::{Schema, SchemaBuilder, Value, ValueBuilder, ValueParser};
fn contact_schema() -> Schema {
SchemaBuilder::new("Contact")
.field(optional_string("name"))
.field(repeated_group(
"phones",
vec![optional_string("number"), optional_string("phone_type")],
))
.build()
}
fn shred_contacts(value: &[Value]) {
let schema = contact_schema();
for value in values {
let parser = ValueParser::new(&schema, value.iter_depth_first());
parser.into_iter().for_each(|parsed| {
if let Ok(shredded) = parsed {
println!("{shredded}");
}
})
}
}
/// Expected Output
/// | String(Some("Alice")) | name | def=1 | rep=0 |
/// | String(Some("555-1234")) | phones.number | def=2 | rep=0 |
/// | String(Some("Home")) | phones.phone_type | def=2 | rep=0 |
/// | String(Some("555-5678")) | phones.number | def=2 | rep=1 |
/// | String(Some("Work")) | phones.phone_type | def=2 | rep=1 |