Skip to content

Commit 50f5562

Browse files
alambklion26viirya
authored
Convert JSON to VariantArray without copying (8 - 32% faster) (#7911)
# Which issue does this PR close? - part of #6736 - Closes #7964 - Follow on to #7905 # Rationale for this change In a quest to have the fastest and most efficient Variant implementation I would like to avoid copies if at all possible Right now, to make a VariantArray first requires completing an individual buffer and appending it to the array. Let's make that faster by having the VariantBuilder append directly into the buffer # What changes are included in this PR? 1. Add `VariantBuilder::new_from_existing` 2. Add a `VariantArrayBuilder::variant_builder` that reuses the buffers # Are these changes tested? 1. New unit tests 1. Yes by existing tests # Are there any user-facing changes? Hopefully faster performance --------- Co-authored-by: Congxian Qiu <qcx978132955@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent ec81db3 commit 50f5562

File tree

5 files changed

+327
-54
lines changed

5 files changed

+327
-54
lines changed

parquet-variant-compute/src/from_json.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
use crate::{VariantArray, VariantArrayBuilder};
2222
use arrow::array::{Array, ArrayRef, StringArray};
2323
use arrow_schema::ArrowError;
24-
use parquet_variant::VariantBuilder;
2524
use parquet_variant_json::json_to_variant;
2625

2726
/// Parse a batch of JSON strings into a batch of Variants represented as
@@ -41,10 +40,10 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<VariantArray, Ar
4140
// The subfields are expected to be non-nullable according to the parquet variant spec.
4241
variant_array_builder.append_null();
4342
} else {
44-
let mut vb = VariantBuilder::new();
43+
let mut vb = variant_array_builder.variant_builder();
44+
// parse JSON directly to the variant builder
4545
json_to_variant(input_string_array.value(i), &mut vb)?;
46-
let (metadata, value) = vb.finish();
47-
variant_array_builder.append_variant_buffers(&metadata, &value);
46+
vb.finish()
4847
}
4948
}
5049
Ok(variant_array_builder.build())

parquet-variant-compute/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ mod variant_array_builder;
2222
pub mod variant_get;
2323

2424
pub use variant_array::VariantArray;
25-
pub use variant_array_builder::VariantArrayBuilder;
25+
pub use variant_array_builder::{VariantArrayBuilder, VariantArrayVariantBuilder};
2626

2727
pub use from_json::batch_json_string_to_variant;
2828
pub use to_json::batch_variant_to_json_string;

parquet-variant-compute/src/variant_array_builder.rs

Lines changed: 285 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use crate::VariantArray;
2121
use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray};
2222
use arrow_schema::{DataType, Field, Fields};
23-
use parquet_variant::{Variant, VariantBuilder};
23+
use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt};
2424
use std::sync::Arc;
2525

2626
/// A builder for [`VariantArray`]
@@ -37,23 +37,21 @@ use std::sync::Arc;
3737
/// ## Example:
3838
/// ```
3939
/// # use arrow::array::Array;
40-
/// # use parquet_variant::{Variant, VariantBuilder};
40+
/// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
4141
/// # use parquet_variant_compute::VariantArrayBuilder;
4242
/// // Create a new VariantArrayBuilder with a capacity of 100 rows
4343
/// let mut builder = VariantArrayBuilder::new(100);
4444
/// // append variant values
4545
/// builder.append_variant(Variant::from(42));
46-
/// // append a null row
46+
/// // append a null row (note not a Variant::Null)
4747
/// builder.append_null();
48-
/// // append a pre-constructed metadata and value buffers
49-
/// let (metadata, value) = {
50-
/// let mut vb = VariantBuilder::new();
51-
/// let mut obj = vb.new_object();
52-
/// obj.insert("foo", "bar");
53-
/// obj.finish().unwrap();
54-
/// vb.finish()
55-
/// };
56-
/// builder.append_variant_buffers(&metadata, &value);
48+
/// // append an object to the builder
49+
/// let mut vb = builder.variant_builder();
50+
/// vb.new_object()
51+
/// .with_field("foo", "bar")
52+
/// .finish()
53+
/// .unwrap();
54+
/// vb.finish(); // must call finish to write the variant to the buffers
5755
///
5856
/// // create the final VariantArray
5957
/// let variant_array = builder.build();
@@ -66,7 +64,9 @@ use std::sync::Arc;
6664
/// assert!(variant_array.is_null(1));
6765
/// // row 2 is not null and is an object
6866
/// assert!(!variant_array.is_null(2));
69-
/// assert!(variant_array.value(2).as_object().is_some());
67+
/// let value = variant_array.value(2);
68+
/// let obj = value.as_object().expect("expected object");
69+
/// assert_eq!(obj.get("foo"), Some(Variant::from("bar")));
7070
/// ```
7171
#[derive(Debug)]
7272
pub struct VariantArrayBuilder {
@@ -147,28 +147,195 @@ impl VariantArrayBuilder {
147147

148148
/// Append the [`Variant`] to the builder as the next row
149149
pub fn append_variant(&mut self, variant: Variant) {
150-
// TODO make this more efficient by avoiding the intermediate buffers
151-
let mut variant_builder = VariantBuilder::new();
152-
variant_builder.append_value(variant);
153-
let (metadata, value) = variant_builder.finish();
154-
self.append_variant_buffers(&metadata, &value);
150+
let mut direct_builder = self.variant_builder();
151+
direct_builder.variant_builder.append_value(variant);
152+
direct_builder.finish()
155153
}
156154

157-
/// Append a metadata and values buffer to the builder
158-
pub fn append_variant_buffers(&mut self, metadata: &[u8], value: &[u8]) {
159-
self.nulls.append_non_null();
160-
let metadata_length = metadata.len();
161-
let metadata_offset = self.metadata_buffer.len();
162-
self.metadata_locations
163-
.push((metadata_offset, metadata_length));
164-
self.metadata_buffer.extend_from_slice(metadata);
165-
let value_length = value.len();
166-
let value_offset = self.value_buffer.len();
167-
self.value_locations.push((value_offset, value_length));
168-
self.value_buffer.extend_from_slice(value);
155+
/// Return a `VariantArrayVariantBuilder` that writes directly to the
156+
/// buffers of this builder.
157+
///
158+
/// You must call [`VariantArrayVariantBuilder::finish`] to complete the builder
159+
///
160+
/// # Example
161+
/// ```
162+
/// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
163+
/// # use parquet_variant_compute::{VariantArray, VariantArrayBuilder};
164+
/// let mut array_builder = VariantArrayBuilder::new(10);
165+
///
166+
/// // First row has a string
167+
/// let mut variant_builder = array_builder.variant_builder();
168+
/// variant_builder.append_value("Hello, World!");
169+
/// // must call finish to write the variant to the buffers
170+
/// variant_builder.finish();
171+
///
172+
/// // Second row is an object
173+
/// let mut variant_builder = array_builder.variant_builder();
174+
/// variant_builder
175+
/// .new_object()
176+
/// .with_field("my_field", 42i64)
177+
/// .finish()
178+
/// .unwrap();
179+
/// variant_builder.finish();
180+
///
181+
/// // finalize the array
182+
/// let variant_array: VariantArray = array_builder.build();
183+
///
184+
/// // verify what we wrote is still there
185+
/// assert_eq!(variant_array.value(0), Variant::from("Hello, World!"));
186+
/// assert!(variant_array.value(1).as_object().is_some());
187+
/// ```
188+
pub fn variant_builder(&mut self) -> VariantArrayVariantBuilder {
189+
// append directly into the metadata and value buffers
190+
let metadata_buffer = std::mem::take(&mut self.metadata_buffer);
191+
let value_buffer = std::mem::take(&mut self.value_buffer);
192+
VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer)
193+
}
194+
}
195+
196+
/// A `VariantBuilderExt` that writes directly to the buffers of a `VariantArrayBuilder`.
197+
///
198+
// This struct implements [`VariantBuilderExt`], so in most cases it can be used as a
199+
// [`VariantBuilder`] to perform variant-related operations for [`VariantArrayBuilder`].
200+
///
201+
/// If [`Self::finish`] is not called, any changes will be rolled back
202+
///
203+
/// See [`VariantArrayBuilder::variant_builder`] for an example
204+
pub struct VariantArrayVariantBuilder<'a> {
205+
/// was finish called?
206+
finished: bool,
207+
/// starting offset in the variant_builder's `metadata` buffer
208+
metadata_offset: usize,
209+
/// starting offset in the variant_builder's `value` buffer
210+
value_offset: usize,
211+
/// Parent array builder that this variant builder writes to. Buffers
212+
/// have been moved into the variant builder, and must be returned on
213+
/// drop
214+
array_builder: &'a mut VariantArrayBuilder,
215+
/// Builder for the in progress variant value, temporarily owns the buffers
216+
/// from `array_builder`
217+
variant_builder: VariantBuilder,
218+
}
219+
220+
impl<'a> VariantBuilderExt for VariantArrayVariantBuilder<'a> {
221+
fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
222+
self.variant_builder.append_value(value);
223+
}
224+
225+
fn new_list(&mut self) -> ListBuilder {
226+
self.variant_builder.new_list()
227+
}
228+
229+
fn new_object(&mut self) -> ObjectBuilder {
230+
self.variant_builder.new_object()
231+
}
232+
}
233+
234+
impl<'a> VariantArrayVariantBuilder<'a> {
235+
/// Constructs a new VariantArrayVariantBuilder
236+
///
237+
/// Note this is not public as this is a structure that is logically
238+
/// part of the [`VariantArrayBuilder`] and relies on its internal structure
239+
fn new(
240+
array_builder: &'a mut VariantArrayBuilder,
241+
metadata_buffer: Vec<u8>,
242+
value_buffer: Vec<u8>,
243+
) -> Self {
244+
let metadata_offset = metadata_buffer.len();
245+
let value_offset = value_buffer.len();
246+
VariantArrayVariantBuilder {
247+
finished: false,
248+
metadata_offset,
249+
value_offset,
250+
variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer),
251+
array_builder,
252+
}
253+
}
254+
255+
/// Return a reference to the underlying `VariantBuilder`
256+
pub fn inner(&self) -> &VariantBuilder {
257+
&self.variant_builder
258+
}
259+
260+
/// Return a mutable reference to the underlying `VariantBuilder`
261+
pub fn inner_mut(&mut self) -> &mut VariantBuilder {
262+
&mut self.variant_builder
263+
}
264+
265+
/// Called to finish the in progress variant and write it to the underlying
266+
/// buffers
267+
///
268+
/// Note if you do not call finish, on drop any changes made to the
269+
/// underlying buffers will be rolled back.
270+
pub fn finish(mut self) {
271+
self.finished = true;
272+
273+
let metadata_offset = self.metadata_offset;
274+
let value_offset = self.value_offset;
275+
// get the buffers back from the variant builder
276+
let (metadata_buffer, value_buffer) = std::mem::take(&mut self.variant_builder).finish();
277+
278+
// Sanity Check: if the buffers got smaller, something went wrong (previous data was lost)
279+
let metadata_len = metadata_buffer
280+
.len()
281+
.checked_sub(metadata_offset)
282+
.expect("metadata length decreased unexpectedly");
283+
let value_len = value_buffer
284+
.len()
285+
.checked_sub(value_offset)
286+
.expect("value length decreased unexpectedly");
287+
288+
// commit the changes by putting the
289+
// offsets and lengths into the parent array builder.
290+
self.array_builder
291+
.metadata_locations
292+
.push((metadata_offset, metadata_len));
293+
self.array_builder
294+
.value_locations
295+
.push((value_offset, value_len));
296+
self.array_builder.nulls.append_non_null();
297+
// put the buffers back into the array builder
298+
self.array_builder.metadata_buffer = metadata_buffer;
299+
self.array_builder.value_buffer = value_buffer;
169300
}
301+
}
302+
303+
impl<'a> Drop for VariantArrayVariantBuilder<'a> {
304+
/// If the builder was not finished, roll back any changes made to the
305+
/// underlying buffers (by truncating them)
306+
fn drop(&mut self) {
307+
if self.finished {
308+
return;
309+
}
310+
311+
// if the object was not finished, need to rollback any changes by
312+
// truncating the buffers to the original offsets
313+
let metadata_offset = self.metadata_offset;
314+
let value_offset = self.value_offset;
315+
316+
// get the buffers back from the variant builder
317+
let (mut metadata_buffer, mut value_buffer) =
318+
std::mem::take(&mut self.variant_builder).into_buffers();
319+
320+
// Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) so panic immediately
321+
metadata_buffer
322+
.len()
323+
.checked_sub(metadata_offset)
324+
.expect("metadata length decreased unexpectedly");
325+
value_buffer
326+
.len()
327+
.checked_sub(value_offset)
328+
.expect("value length decreased unexpectedly");
329+
330+
// Note this truncate is fast because truncate doesn't free any memory:
331+
// it just has to drop elements (and u8 doesn't have a destructor)
332+
metadata_buffer.truncate(metadata_offset);
333+
value_buffer.truncate(value_offset);
170334

171-
// TODO: Return a Variant builder that will write to the underlying buffers (TODO)
335+
// put the buffers back into the array builder
336+
self.array_builder.metadata_buffer = metadata_buffer;
337+
self.array_builder.value_buffer = value_buffer;
338+
}
172339
}
173340

174341
fn binary_view_array_from_buffers(
@@ -220,4 +387,91 @@ mod test {
220387
);
221388
}
222389
}
390+
391+
/// Test using sub builders to append variants
392+
#[test]
393+
fn test_variant_array_builder_variant_builder() {
394+
let mut builder = VariantArrayBuilder::new(10);
395+
builder.append_null(); // should not panic
396+
builder.append_variant(Variant::from(42i32));
397+
398+
// let's make a sub-object in the next row
399+
let mut sub_builder = builder.variant_builder();
400+
sub_builder
401+
.new_object()
402+
.with_field("foo", "bar")
403+
.finish()
404+
.unwrap();
405+
sub_builder.finish(); // must call finish to write the variant to the buffers
406+
407+
// append a new list
408+
let mut sub_builder = builder.variant_builder();
409+
sub_builder
410+
.new_list()
411+
.with_value(Variant::from(1i32))
412+
.with_value(Variant::from(2i32))
413+
.finish();
414+
sub_builder.finish();
415+
let variant_array = builder.build();
416+
417+
assert_eq!(variant_array.len(), 4);
418+
assert!(variant_array.is_null(0));
419+
assert!(!variant_array.is_null(1));
420+
assert_eq!(variant_array.value(1), Variant::from(42i32));
421+
assert!(!variant_array.is_null(2));
422+
let variant = variant_array.value(2);
423+
let variant = variant.as_object().expect("variant to be an object");
424+
assert_eq!(variant.get("foo").unwrap(), Variant::from("bar"));
425+
assert!(!variant_array.is_null(3));
426+
let variant = variant_array.value(3);
427+
let list = variant.as_list().expect("variant to be a list");
428+
assert_eq!(list.len(), 2);
429+
}
430+
431+
/// Test using non-finished sub builders to append variants
432+
#[test]
433+
fn test_variant_array_builder_variant_builder_reset() {
434+
let mut builder = VariantArrayBuilder::new(10);
435+
436+
// make a sub-object in the first row
437+
let mut sub_builder = builder.variant_builder();
438+
sub_builder
439+
.new_object()
440+
.with_field("foo", 1i32)
441+
.finish()
442+
.unwrap();
443+
sub_builder.finish(); // must call finish to write the variant to the buffers
444+
445+
// start appending an object but don't finish
446+
let mut sub_builder = builder.variant_builder();
447+
sub_builder
448+
.new_object()
449+
.with_field("bar", 2i32)
450+
.finish()
451+
.unwrap();
452+
drop(sub_builder); // drop the sub builder without finishing it
453+
454+
// make a third sub-object (this should reset the previous unfinished object)
455+
let mut sub_builder = builder.variant_builder();
456+
sub_builder
457+
.new_object()
458+
.with_field("baz", 3i32)
459+
.finish()
460+
.unwrap();
461+
sub_builder.finish(); // must call finish to write the variant to the buffers
462+
463+
let variant_array = builder.build();
464+
465+
// only the two finished objects should be present
466+
assert_eq!(variant_array.len(), 2);
467+
assert!(!variant_array.is_null(0));
468+
let variant = variant_array.value(0);
469+
let variant = variant.as_object().expect("variant to be an object");
470+
assert_eq!(variant.get("foo").unwrap(), Variant::from(1i32));
471+
472+
assert!(!variant_array.is_null(1));
473+
let variant = variant_array.value(1);
474+
let variant = variant.as_object().expect("variant to be an object");
475+
assert_eq!(variant.get("baz").unwrap(), Variant::from(3i32));
476+
}
223477
}

0 commit comments

Comments
 (0)