Skip to content

Convert JSON to VariantArray without copying (8 - 32% faster) #7911

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions parquet-variant-compute/src/from_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use crate::{VariantArray, VariantArrayBuilder};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_schema::ArrowError;
use parquet_variant::VariantBuilder;
use parquet_variant_json::json_to_variant;

/// Parse a batch of JSON strings into a batch of Variants represented as
Expand All @@ -41,10 +40,10 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<VariantArray, Ar
// The subfields are expected to be non-nullable according to the parquet variant spec.
variant_array_builder.append_null();
} else {
let mut vb = VariantBuilder::new();
let mut vb = variant_array_builder.variant_builder();
// parse JSON directly to the variant builder
json_to_variant(input_string_array.value(i), &mut vb)?;
let (metadata, value) = vb.finish();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole point if this PR is to avoid this copy / append

variant_array_builder.append_variant_buffers(&metadata, &value);
vb.finish()
}
}
Ok(variant_array_builder.build())
Expand Down
2 changes: 1 addition & 1 deletion parquet-variant-compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod variant_array_builder;
pub mod variant_get;

pub use variant_array::VariantArray;
pub use variant_array_builder::VariantArrayBuilder;
pub use variant_array_builder::{VariantArrayBuilder, VariantArrayVariantBuilder};

pub use from_json::batch_json_string_to_variant;
pub use to_json::batch_variant_to_json_string;
316 changes: 285 additions & 31 deletions parquet-variant-compute/src/variant_array_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::VariantArray;
use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray};
use arrow_schema::{DataType, Field, Fields};
use parquet_variant::{Variant, VariantBuilder};
use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt};
use std::sync::Arc;

/// A builder for [`VariantArray`]
Expand All @@ -37,23 +37,21 @@ use std::sync::Arc;
/// ## Example:
/// ```
/// # use arrow::array::Array;
/// # use parquet_variant::{Variant, VariantBuilder};
/// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
/// # use parquet_variant_compute::VariantArrayBuilder;
/// // Create a new VariantArrayBuilder with a capacity of 100 rows
/// let mut builder = VariantArrayBuilder::new(100);
/// // append variant values
/// builder.append_variant(Variant::from(42));
/// // append a null row
/// // append a null row (note not a Variant::Null)
/// builder.append_null();
/// // append a pre-constructed metadata and value buffers
/// let (metadata, value) = {
/// let mut vb = VariantBuilder::new();
/// let mut obj = vb.new_object();
/// obj.insert("foo", "bar");
/// obj.finish().unwrap();
/// vb.finish()
/// };
/// builder.append_variant_buffers(&metadata, &value);
/// // append an object to the builder
/// let mut vb = builder.variant_builder();
/// vb.new_object()
/// .with_field("foo", "bar")
/// .finish()
/// .unwrap();
/// vb.finish(); // must call finish to write the variant to the buffers
///
/// // create the final VariantArray
/// let variant_array = builder.build();
Expand All @@ -66,7 +64,9 @@ use std::sync::Arc;
/// assert!(variant_array.is_null(1));
/// // row 2 is not null and is an object
/// assert!(!variant_array.is_null(2));
/// assert!(variant_array.value(2).as_object().is_some());
/// let value = variant_array.value(2);
/// let obj = value.as_object().expect("expected object");
/// assert_eq!(obj.get("foo"), Some(Variant::from("bar")));
/// ```
#[derive(Debug)]
pub struct VariantArrayBuilder {
Expand Down Expand Up @@ -147,28 +147,195 @@ impl VariantArrayBuilder {

/// Append the [`Variant`] to the builder as the next row
pub fn append_variant(&mut self, variant: Variant) {
// TODO make this more efficient by avoiding the intermediate buffers
let mut variant_builder = VariantBuilder::new();
variant_builder.append_value(variant);
let (metadata, value) = variant_builder.finish();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole point of this PR is to avoid this copy here and instead write directly into the output

self.append_variant_buffers(&metadata, &value);
let mut direct_builder = self.variant_builder();
direct_builder.variant_builder.append_value(variant);
direct_builder.finish()
}

/// Append a metadata and values buffer to the builder
pub fn append_variant_buffers(&mut self, metadata: &[u8], value: &[u8]) {
self.nulls.append_non_null();
let metadata_length = metadata.len();
let metadata_offset = self.metadata_buffer.len();
self.metadata_locations
.push((metadata_offset, metadata_length));
self.metadata_buffer.extend_from_slice(metadata);
let value_length = value.len();
let value_offset = self.value_buffer.len();
self.value_locations.push((value_offset, value_length));
self.value_buffer.extend_from_slice(value);
/// Return a `VariantArrayVariantBuilder` that writes directly to the
/// buffers of this builder.
///
/// You must call [`VariantArrayVariantBuilder::finish`] to complete the builder
///
/// # Example
/// ```
/// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
/// # use parquet_variant_compute::{VariantArray, VariantArrayBuilder};
/// let mut array_builder = VariantArrayBuilder::new(10);
///
/// // First row has a string
/// let mut variant_builder = array_builder.variant_builder();
/// variant_builder.append_value("Hello, World!");
/// // must call finish to write the variant to the buffers
/// variant_builder.finish();
///
/// // Second row is an object
/// let mut variant_builder = array_builder.variant_builder();
/// variant_builder
/// .new_object()
/// .with_field("my_field", 42i64)
/// .finish()
/// .unwrap();
/// variant_builder.finish();
///
/// // finalize the array
/// let variant_array: VariantArray = array_builder.build();
///
/// // verify what we wrote is still there
/// assert_eq!(variant_array.value(0), Variant::from("Hello, World!"));
/// assert!(variant_array.value(1).as_object().is_some());
/// ```
pub fn variant_builder(&mut self) -> VariantArrayVariantBuilder {
// append directly into the metadata and value buffers
let metadata_buffer = std::mem::take(&mut self.metadata_buffer);
let value_buffer = std::mem::take(&mut self.value_buffer);
VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer)
}
}

/// A `VariantBuilderExt` that writes directly to the buffers of a `VariantArrayBuilder`.
///
// This struct implements [`VariantBuilderExt`], so in most cases it can be used as a
// [`VariantBuilder`] to perform variant-related operations for [`VariantArrayBuilder`].
///
/// If [`Self::finish`] is not called, any changes will be rolled back
///
/// See [`VariantArrayBuilder::variant_builder`] for an example
pub struct VariantArrayVariantBuilder<'a> {
/// was finish called?
finished: bool,
/// starting offset in the variant_builder's `metadata` buffer
metadata_offset: usize,
/// starting offset in the variant_builder's `value` buffer
value_offset: usize,
/// Parent array builder that this variant builder writes to. Buffers
/// have been moved into the variant builder, and must be returned on
/// drop
array_builder: &'a mut VariantArrayBuilder,
/// Builder for the in progress variant value, temporarily owns the buffers
/// from `array_builder`
variant_builder: VariantBuilder,
}

impl<'a> VariantBuilderExt for VariantArrayVariantBuilder<'a> {
fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
self.variant_builder.append_value(value);
}

fn new_list(&mut self) -> ListBuilder {
self.variant_builder.new_list()
}

fn new_object(&mut self) -> ObjectBuilder {
self.variant_builder.new_object()
}
}

impl<'a> VariantArrayVariantBuilder<'a> {
/// Constructs a new VariantArrayVariantBuilder
///
/// Note this is not public as this is a structure that is logically
/// part of the [`VariantArrayBuilder`] and relies on its internal structure
fn new(
array_builder: &'a mut VariantArrayBuilder,
metadata_buffer: Vec<u8>,
value_buffer: Vec<u8>,
) -> Self {
let metadata_offset = metadata_buffer.len();
let value_offset = value_buffer.len();
VariantArrayVariantBuilder {
finished: false,
metadata_offset,
value_offset,
variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer),
array_builder,
}
}

/// Return a reference to the underlying `VariantBuilder`
pub fn inner(&self) -> &VariantBuilder {
&self.variant_builder
}

/// Return a mutable reference to the underlying `VariantBuilder`
pub fn inner_mut(&mut self) -> &mut VariantBuilder {
&mut self.variant_builder
}

/// Called to finish the in progress variant and write it to the underlying
/// buffers
///
/// Note if you do not call finish, on drop any changes made to the
/// underlying buffers will be rolled back.
pub fn finish(mut self) {
self.finished = true;

let metadata_offset = self.metadata_offset;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code for finishing / finalizing is moved to finish()

let value_offset = self.value_offset;
// get the buffers back from the variant builder
let (metadata_buffer, value_buffer) = std::mem::take(&mut self.variant_builder).finish();

// Sanity Check: if the buffers got smaller, something went wrong (previous data was lost)
let metadata_len = metadata_buffer
.len()
.checked_sub(metadata_offset)
.expect("metadata length decreased unexpectedly");
let value_len = value_buffer
.len()
.checked_sub(value_offset)
.expect("value length decreased unexpectedly");

// commit the changes by putting the
// offsets and lengths into the parent array builder.
self.array_builder
.metadata_locations
.push((metadata_offset, metadata_len));
self.array_builder
.value_locations
.push((value_offset, value_len));
self.array_builder.nulls.append_non_null();
// put the buffers back into the array builder
self.array_builder.metadata_buffer = metadata_buffer;
self.array_builder.value_buffer = value_buffer;
}
}

impl<'a> Drop for VariantArrayVariantBuilder<'a> {
/// If the builder was not finished, roll back any changes made to the
/// underlying buffers (by truncating them)
fn drop(&mut self) {
Copy link
Contributor

@scovich scovich Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this approach. I was thinking over the weekend that we may want to rework the other builders to follow a similar approach:

  • They can truncate the metadata dictionary on rollback, which would eliminate the false allocations that survive a rollback today
  • We can allocate the value bytes directly in the base buffer (instead of using a separate Vec)
    • On rollback, just truncate (like here)
    • On success, use Vec::splice to insert value offset and field id arrays, which slides over all the other bytes
  • Once we're using splice, it opens the door to pre-allocate the space for the value offset and field arrays, in case the caller knows how many fields or array elements there are.
    • If the prediction was correct, splice just replaces the pre-allocated space.
    • If incorrect, the pre-allocation is wasted (but we're no worse off than before -- the bytes just inject in)
    • The main complication would be guessing how many bytes to encode each offset with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can truncate the metadata dictionary on rollback, which would eliminate the false allocations that survive a rollback today

That is an excellent point

We can allocate the value bytes directly in the base buffer (instead of using a separate Vec)

That sounds like a great way to avoid the extra allocation

Once we're using splice, it opens the door to pre-allocate the space for the value offset and field arrays, in case the caller knows how many fields or array elements there are.

This is also a great idea 🤯

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow up, @klion26 has a PR up to implement this:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That other PR is nice improvement, but the splice call still shifts bytes.

In order to not shift bytes at all, we'd have to pre-allocate exactly the right number of header bytes before recursing into the field values. And then the splice call would just replace the zero-filled header region with the actual header bytes, after they're known (shifting bytes only if the original guess was incorrect).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only way to do this is add some API in the ObjectBuilder somehow to pre-allocate this space (new_object_with_capacity() perhaps 🤔 )

if self.finished {
return;
}

// if the object was not finished, need to rollback any changes by
// truncating the buffers to the original offsets
let metadata_offset = self.metadata_offset;
let value_offset = self.value_offset;

// get the buffers back from the variant builder
let (mut metadata_buffer, mut value_buffer) =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, we can transfer the ownership with this way.

std::mem::take(&mut self.variant_builder).into_buffers();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this now calls into_buffers to get ownership of the buffers back but doesn't call finish


// Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) so panic immediately
metadata_buffer
.len()
.checked_sub(metadata_offset)
.expect("metadata length decreased unexpectedly");
value_buffer
.len()
.checked_sub(value_offset)
.expect("value length decreased unexpectedly");

// Note this truncate is fast because truncate doesn't free any memory:
// it just has to drop elements (and u8 doesn't have a destructor)
metadata_buffer.truncate(metadata_offset);
value_buffer.truncate(value_offset);

// TODO: Return a Variant builder that will write to the underlying buffers (TODO)
// put the buffers back into the array builder
self.array_builder.metadata_buffer = metadata_buffer;
self.array_builder.value_buffer = value_buffer;
}
}

fn binary_view_array_from_buffers(
Expand Down Expand Up @@ -220,4 +387,91 @@ mod test {
);
}
}

/// Test using sub builders to append variants
#[test]
fn test_variant_array_builder_variant_builder() {
let mut builder = VariantArrayBuilder::new(10);
builder.append_null(); // should not panic
builder.append_variant(Variant::from(42i32));

// let's make a sub-object in the next row
let mut sub_builder = builder.variant_builder();
sub_builder
.new_object()
.with_field("foo", "bar")
.finish()
.unwrap();
sub_builder.finish(); // must call finish to write the variant to the buffers

// append a new list
let mut sub_builder = builder.variant_builder();
sub_builder
.new_list()
.with_value(Variant::from(1i32))
.with_value(Variant::from(2i32))
.finish();
sub_builder.finish();
let variant_array = builder.build();

assert_eq!(variant_array.len(), 4);
assert!(variant_array.is_null(0));
assert!(!variant_array.is_null(1));
assert_eq!(variant_array.value(1), Variant::from(42i32));
assert!(!variant_array.is_null(2));
let variant = variant_array.value(2);
let variant = variant.as_object().expect("variant to be an object");
assert_eq!(variant.get("foo").unwrap(), Variant::from("bar"));
assert!(!variant_array.is_null(3));
let variant = variant_array.value(3);
let list = variant.as_list().expect("variant to be a list");
assert_eq!(list.len(), 2);
}

/// Test using non-finished sub builders to append variants
#[test]
fn test_variant_array_builder_variant_builder_reset() {
let mut builder = VariantArrayBuilder::new(10);

// make a sub-object in the first row
let mut sub_builder = builder.variant_builder();
sub_builder
.new_object()
.with_field("foo", 1i32)
.finish()
.unwrap();
sub_builder.finish(); // must call finish to write the variant to the buffers

// start appending an object but don't finish
let mut sub_builder = builder.variant_builder();
sub_builder
.new_object()
.with_field("bar", 2i32)
.finish()
.unwrap();
drop(sub_builder); // drop the sub builder without finishing it

// make a third sub-object (this should reset the previous unfinished object)
let mut sub_builder = builder.variant_builder();
sub_builder
.new_object()
.with_field("baz", 3i32)
.finish()
.unwrap();
sub_builder.finish(); // must call finish to write the variant to the buffers

let variant_array = builder.build();

// only the two finished objects should be present
assert_eq!(variant_array.len(), 2);
assert!(!variant_array.is_null(0));
let variant = variant_array.value(0);
let variant = variant.as_object().expect("variant to be an object");
assert_eq!(variant.get("foo").unwrap(), Variant::from(1i32));

assert!(!variant_array.is_null(1));
let variant = variant_array.value(1);
let variant = variant.as_object().expect("variant to be an object");
assert_eq!(variant.get("baz").unwrap(), Variant::from(3i32));
}
}
Loading
Loading