-
Notifications
You must be signed in to change notification settings - Fork 979
Convert JSON to VariantArray without copying (8 - 32% faster) #7911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fce19eb
236fb5e
23be455
99ea0c4
9920006
65714e5
f23e029
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
use crate::VariantArray; | ||
use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray}; | ||
use arrow_schema::{DataType, Field, Fields}; | ||
use parquet_variant::{Variant, VariantBuilder}; | ||
use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt}; | ||
use std::sync::Arc; | ||
|
||
/// A builder for [`VariantArray`] | ||
|
@@ -37,23 +37,21 @@ use std::sync::Arc; | |
/// ## Example: | ||
/// ``` | ||
/// # use arrow::array::Array; | ||
/// # use parquet_variant::{Variant, VariantBuilder}; | ||
/// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt}; | ||
/// # use parquet_variant_compute::VariantArrayBuilder; | ||
/// // Create a new VariantArrayBuilder with a capacity of 100 rows | ||
/// let mut builder = VariantArrayBuilder::new(100); | ||
/// // append variant values | ||
/// builder.append_variant(Variant::from(42)); | ||
/// // append a null row | ||
/// // append a null row (note not a Variant::Null) | ||
/// builder.append_null(); | ||
/// // append a pre-constructed metadata and value buffers | ||
/// let (metadata, value) = { | ||
/// let mut vb = VariantBuilder::new(); | ||
/// let mut obj = vb.new_object(); | ||
/// obj.insert("foo", "bar"); | ||
/// obj.finish().unwrap(); | ||
/// vb.finish() | ||
/// }; | ||
/// builder.append_variant_buffers(&metadata, &value); | ||
/// // append an object to the builder | ||
/// let mut vb = builder.variant_builder(); | ||
/// vb.new_object() | ||
/// .with_field("foo", "bar") | ||
/// .finish() | ||
/// .unwrap(); | ||
/// vb.finish(); // must call finish to write the variant to the buffers | ||
/// | ||
/// // create the final VariantArray | ||
/// let variant_array = builder.build(); | ||
|
@@ -66,7 +64,9 @@ use std::sync::Arc; | |
/// assert!(variant_array.is_null(1)); | ||
/// // row 2 is not null and is an object | ||
/// assert!(!variant_array.is_null(2)); | ||
/// assert!(variant_array.value(2).as_object().is_some()); | ||
/// let value = variant_array.value(2); | ||
/// let obj = value.as_object().expect("expected object"); | ||
/// assert_eq!(obj.get("foo"), Some(Variant::from("bar"))); | ||
/// ``` | ||
#[derive(Debug)] | ||
pub struct VariantArrayBuilder { | ||
|
@@ -147,28 +147,195 @@ impl VariantArrayBuilder { | |
|
||
/// Append the [`Variant`] to the builder as the next row | ||
pub fn append_variant(&mut self, variant: Variant) { | ||
// TODO make this more efficient by avoiding the intermediate buffers | ||
let mut variant_builder = VariantBuilder::new(); | ||
variant_builder.append_value(variant); | ||
let (metadata, value) = variant_builder.finish(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The whole point of this PR is to avoid this copy here and instead write directly into the output |
||
self.append_variant_buffers(&metadata, &value); | ||
let mut direct_builder = self.variant_builder(); | ||
direct_builder.variant_builder.append_value(variant); | ||
direct_builder.finish() | ||
} | ||
|
||
/// Append a metadata and values buffer to the builder | ||
pub fn append_variant_buffers(&mut self, metadata: &[u8], value: &[u8]) { | ||
self.nulls.append_non_null(); | ||
let metadata_length = metadata.len(); | ||
let metadata_offset = self.metadata_buffer.len(); | ||
self.metadata_locations | ||
.push((metadata_offset, metadata_length)); | ||
self.metadata_buffer.extend_from_slice(metadata); | ||
let value_length = value.len(); | ||
let value_offset = self.value_buffer.len(); | ||
self.value_locations.push((value_offset, value_length)); | ||
self.value_buffer.extend_from_slice(value); | ||
/// Return a `VariantArrayVariantBuilder` that writes directly to the | ||
/// buffers of this builder. | ||
/// | ||
/// You must call [`VariantArrayVariantBuilder::finish`] to complete the builder | ||
/// | ||
/// # Example | ||
/// ``` | ||
/// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt}; | ||
/// # use parquet_variant_compute::{VariantArray, VariantArrayBuilder}; | ||
/// let mut array_builder = VariantArrayBuilder::new(10); | ||
/// | ||
/// // First row has a string | ||
/// let mut variant_builder = array_builder.variant_builder(); | ||
/// variant_builder.append_value("Hello, World!"); | ||
/// // must call finish to write the variant to the buffers | ||
/// variant_builder.finish(); | ||
/// | ||
/// // Second row is an object | ||
/// let mut variant_builder = array_builder.variant_builder(); | ||
/// variant_builder | ||
/// .new_object() | ||
/// .with_field("my_field", 42i64) | ||
/// .finish() | ||
/// .unwrap(); | ||
/// variant_builder.finish(); | ||
/// | ||
/// // finalize the array | ||
/// let variant_array: VariantArray = array_builder.build(); | ||
/// | ||
/// // verify what we wrote is still there | ||
/// assert_eq!(variant_array.value(0), Variant::from("Hello, World!")); | ||
/// assert!(variant_array.value(1).as_object().is_some()); | ||
/// ``` | ||
pub fn variant_builder(&mut self) -> VariantArrayVariantBuilder { | ||
// append directly into the metadata and value buffers | ||
let metadata_buffer = std::mem::take(&mut self.metadata_buffer); | ||
let value_buffer = std::mem::take(&mut self.value_buffer); | ||
VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer) | ||
} | ||
} | ||
|
||
/// A `VariantBuilderExt` that writes directly to the buffers of a `VariantArrayBuilder`. | ||
/// | ||
// This struct implements [`VariantBuilderExt`], so in most cases it can be used as a | ||
// [`VariantBuilder`] to perform variant-related operations for [`VariantArrayBuilder`]. | ||
/// | ||
/// If [`Self::finish`] is not called, any changes will be rolled back | ||
/// | ||
/// See [`VariantArrayBuilder::variant_builder`] for an example | ||
pub struct VariantArrayVariantBuilder<'a> { | ||
/// was finish called? | ||
finished: bool, | ||
/// starting offset in the variant_builder's `metadata` buffer | ||
metadata_offset: usize, | ||
/// starting offset in the variant_builder's `value` buffer | ||
value_offset: usize, | ||
/// Parent array builder that this variant builder writes to. Buffers | ||
/// have been moved into the variant builder, and must be returned on | ||
/// drop | ||
array_builder: &'a mut VariantArrayBuilder, | ||
/// Builder for the in progress variant value, temporarily owns the buffers | ||
/// from `array_builder` | ||
variant_builder: VariantBuilder, | ||
} | ||
|
||
impl<'a> VariantBuilderExt for VariantArrayVariantBuilder<'a> { | ||
fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) { | ||
self.variant_builder.append_value(value); | ||
} | ||
|
||
fn new_list(&mut self) -> ListBuilder { | ||
self.variant_builder.new_list() | ||
} | ||
|
||
fn new_object(&mut self) -> ObjectBuilder { | ||
self.variant_builder.new_object() | ||
} | ||
} | ||
|
||
impl<'a> VariantArrayVariantBuilder<'a> { | ||
/// Constructs a new VariantArrayVariantBuilder | ||
/// | ||
/// Note this is not public as this is a structure that is logically | ||
/// part of the [`VariantArrayBuilder`] and relies on its internal structure | ||
fn new( | ||
array_builder: &'a mut VariantArrayBuilder, | ||
metadata_buffer: Vec<u8>, | ||
value_buffer: Vec<u8>, | ||
) -> Self { | ||
let metadata_offset = metadata_buffer.len(); | ||
let value_offset = value_buffer.len(); | ||
VariantArrayVariantBuilder { | ||
finished: false, | ||
metadata_offset, | ||
value_offset, | ||
variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer), | ||
array_builder, | ||
} | ||
} | ||
|
||
/// Return a reference to the underlying `VariantBuilder` | ||
pub fn inner(&self) -> &VariantBuilder { | ||
&self.variant_builder | ||
} | ||
|
||
/// Return a mutable reference to the underlying `VariantBuilder` | ||
pub fn inner_mut(&mut self) -> &mut VariantBuilder { | ||
&mut self.variant_builder | ||
} | ||
|
||
/// Called to finish the in progress variant and write it to the underlying | ||
/// buffers | ||
/// | ||
/// Note if you do not call finish, on drop any changes made to the | ||
/// underlying buffers will be rolled back. | ||
pub fn finish(mut self) { | ||
self.finished = true; | ||
|
||
let metadata_offset = self.metadata_offset; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code for finishing / finalizing is moved to |
||
let value_offset = self.value_offset; | ||
// get the buffers back from the variant builder | ||
let (metadata_buffer, value_buffer) = std::mem::take(&mut self.variant_builder).finish(); | ||
|
||
// Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) | ||
let metadata_len = metadata_buffer | ||
.len() | ||
.checked_sub(metadata_offset) | ||
.expect("metadata length decreased unexpectedly"); | ||
let value_len = value_buffer | ||
.len() | ||
.checked_sub(value_offset) | ||
.expect("value length decreased unexpectedly"); | ||
|
||
// commit the changes by putting the | ||
// offsets and lengths into the parent array builder. | ||
self.array_builder | ||
.metadata_locations | ||
.push((metadata_offset, metadata_len)); | ||
self.array_builder | ||
.value_locations | ||
.push((value_offset, value_len)); | ||
self.array_builder.nulls.append_non_null(); | ||
// put the buffers back into the array builder | ||
self.array_builder.metadata_buffer = metadata_buffer; | ||
self.array_builder.value_buffer = value_buffer; | ||
} | ||
} | ||
|
||
impl<'a> Drop for VariantArrayVariantBuilder<'a> { | ||
/// If the builder was not finished, roll back any changes made to the | ||
/// underlying buffers (by truncating them) | ||
fn drop(&mut self) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really like this approach. I was thinking over the weekend that we may want to rework the other builders to follow a similar approach:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That is an excellent point
That sounds like a great way to avoid the extra allocation
This is also a great idea 🤯 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a follow up, @klion26 has a PR up to implement this: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That other PR is nice improvement, but the In order to not shift bytes at all, we'd have to pre-allocate exactly the right number of header bytes before recursing into the field values. And then the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the only way to do this is add some API in the ObjectBuilder somehow to pre-allocate this space ( |
||
if self.finished { | ||
return; | ||
} | ||
|
||
// if the object was not finished, need to rollback any changes by | ||
// truncating the buffers to the original offsets | ||
let metadata_offset = self.metadata_offset; | ||
let value_offset = self.value_offset; | ||
|
||
// get the buffers back from the variant builder | ||
let (mut metadata_buffer, mut value_buffer) = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool, we can transfer the ownership with this way. |
||
std::mem::take(&mut self.variant_builder).into_buffers(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note this now calls |
||
|
||
// Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) so panic immediately | ||
metadata_buffer | ||
.len() | ||
.checked_sub(metadata_offset) | ||
.expect("metadata length decreased unexpectedly"); | ||
value_buffer | ||
.len() | ||
.checked_sub(value_offset) | ||
.expect("value length decreased unexpectedly"); | ||
|
||
// Note this truncate is fast because truncate doesn't free any memory: | ||
// it just has to drop elements (and u8 doesn't have a destructor) | ||
metadata_buffer.truncate(metadata_offset); | ||
value_buffer.truncate(value_offset); | ||
|
||
// TODO: Return a Variant builder that will write to the underlying buffers (TODO) | ||
// put the buffers back into the array builder | ||
self.array_builder.metadata_buffer = metadata_buffer; | ||
self.array_builder.value_buffer = value_buffer; | ||
} | ||
} | ||
|
||
fn binary_view_array_from_buffers( | ||
|
@@ -220,4 +387,91 @@ mod test { | |
); | ||
} | ||
} | ||
|
||
/// Test using sub builders to append variants | ||
#[test] | ||
fn test_variant_array_builder_variant_builder() { | ||
let mut builder = VariantArrayBuilder::new(10); | ||
builder.append_null(); // should not panic | ||
builder.append_variant(Variant::from(42i32)); | ||
|
||
// let's make a sub-object in the next row | ||
let mut sub_builder = builder.variant_builder(); | ||
sub_builder | ||
.new_object() | ||
.with_field("foo", "bar") | ||
.finish() | ||
.unwrap(); | ||
sub_builder.finish(); // must call finish to write the variant to the buffers | ||
|
||
// append a new list | ||
let mut sub_builder = builder.variant_builder(); | ||
sub_builder | ||
.new_list() | ||
.with_value(Variant::from(1i32)) | ||
.with_value(Variant::from(2i32)) | ||
.finish(); | ||
sub_builder.finish(); | ||
let variant_array = builder.build(); | ||
|
||
assert_eq!(variant_array.len(), 4); | ||
assert!(variant_array.is_null(0)); | ||
assert!(!variant_array.is_null(1)); | ||
assert_eq!(variant_array.value(1), Variant::from(42i32)); | ||
assert!(!variant_array.is_null(2)); | ||
let variant = variant_array.value(2); | ||
let variant = variant.as_object().expect("variant to be an object"); | ||
assert_eq!(variant.get("foo").unwrap(), Variant::from("bar")); | ||
assert!(!variant_array.is_null(3)); | ||
let variant = variant_array.value(3); | ||
let list = variant.as_list().expect("variant to be a list"); | ||
assert_eq!(list.len(), 2); | ||
} | ||
|
||
/// Test using non-finished sub builders to append variants | ||
#[test] | ||
fn test_variant_array_builder_variant_builder_reset() { | ||
let mut builder = VariantArrayBuilder::new(10); | ||
|
||
// make a sub-object in the first row | ||
let mut sub_builder = builder.variant_builder(); | ||
sub_builder | ||
.new_object() | ||
.with_field("foo", 1i32) | ||
.finish() | ||
.unwrap(); | ||
sub_builder.finish(); // must call finish to write the variant to the buffers | ||
|
||
// start appending an object but don't finish | ||
let mut sub_builder = builder.variant_builder(); | ||
sub_builder | ||
.new_object() | ||
.with_field("bar", 2i32) | ||
.finish() | ||
.unwrap(); | ||
drop(sub_builder); // drop the sub builder without finishing it | ||
|
||
// make a third sub-object (this should reset the previous unfinished object) | ||
let mut sub_builder = builder.variant_builder(); | ||
sub_builder | ||
.new_object() | ||
.with_field("baz", 3i32) | ||
.finish() | ||
.unwrap(); | ||
sub_builder.finish(); // must call finish to write the variant to the buffers | ||
|
||
let variant_array = builder.build(); | ||
|
||
// only the two finished objects should be present | ||
assert_eq!(variant_array.len(), 2); | ||
assert!(!variant_array.is_null(0)); | ||
let variant = variant_array.value(0); | ||
let variant = variant.as_object().expect("variant to be an object"); | ||
assert_eq!(variant.get("foo").unwrap(), Variant::from(1i32)); | ||
|
||
assert!(!variant_array.is_null(1)); | ||
let variant = variant_array.value(1); | ||
let variant = variant.as_object().expect("variant to be an object"); | ||
assert_eq!(variant.get("baz").unwrap(), Variant::from(3i32)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole point if this PR is to avoid this copy / append