Skip to content

Commit c4b2874

Browse files
committed
Avoid a copy in VariantArrayBuilder
1 parent 86913cf commit c4b2874

File tree

2 files changed

+105
-11
lines changed

2 files changed

+105
-11
lines changed

parquet-variant-compute/src/from_json.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
use crate::{VariantArray, VariantArrayBuilder};
2222
use arrow::array::{Array, ArrayRef, StringArray};
2323
use arrow_schema::ArrowError;
24-
use parquet_variant::VariantBuilder;
2524
use parquet_variant_json::json_to_variant;
2625

2726
/// Parse a batch of JSON strings into a batch of Variants represented as
@@ -41,10 +40,9 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<VariantArray, Ar
4140
// The subfields are expected to be non-nullable according to the parquet variant spec.
4241
variant_array_builder.append_null();
4342
} else {
44-
let mut vb = VariantBuilder::new();
45-
json_to_variant(input_string_array.value(i), &mut vb)?;
46-
let (metadata, value) = vb.finish();
47-
variant_array_builder.append_variant_buffers(&metadata, &value);
43+
let mut vb = variant_array_builder.variant_builder();
44+
json_to_variant(input_string_array.value(i), vb.inner_mut())?;
45+
vb.finish()
4846
}
4947
}
5048
Ok(variant_array_builder.build())

parquet-variant-compute/src/variant_array_builder.rs

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,9 @@ impl VariantArrayBuilder {
147147

148148
/// Append the [`Variant`] to the builder as the next row
149149
pub fn append_variant(&mut self, variant: Variant) {
150-
// TODO make this more efficient by avoiding the intermediate buffers
151-
let mut variant_builder = VariantBuilder::new();
152-
variant_builder.append_value(variant);
153-
let (metadata, value) = variant_builder.finish();
154-
self.append_variant_buffers(&metadata, &value);
150+
let mut direct_builder = self.variant_builder();
151+
direct_builder.variant_builder.append_value(variant);
152+
direct_builder.finish()
155153
}
156154

157155
/// Append a metadata and values buffer to the builder
@@ -168,7 +166,105 @@ impl VariantArrayBuilder {
168166
self.value_buffer.extend_from_slice(value);
169167
}
170168

171-
// TODO: Return a Variant builder that will write to the underlying buffers (TODO)
169+
/// Return a DirectVariantBuilder that writes directly to the buffers of this builder.
170+
pub fn variant_builder(&mut self) -> DirectVariantBuilder {
171+
// append directly into the metadata and value buffers
172+
let metadata_buffer = std::mem::take(&mut self.metadata_buffer);
173+
let value_buffer = std::mem::take(&mut self.value_buffer);
174+
let metadata_offset = metadata_buffer.len();
175+
let value_offset = value_buffer.len();
176+
DirectVariantBuilder {
177+
finished: false,
178+
metadata_offset,
179+
value_offset,
180+
variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer),
181+
array_builder: self,
182+
}
183+
}
184+
}
185+
186+
/// A `VariantBuilder` that writes directly to the buffers of a `VariantArrayBuilder`.
187+
///
188+
/// See [`VariantArray::variant_builder`] for an example
189+
pub struct DirectVariantBuilder<'a> {
190+
/// was finish called?
191+
finished: bool,
192+
/// starting metadata offset
193+
metadata_offset: usize,
194+
/// starting value offset
195+
value_offset: usize,
196+
array_builder: &'a mut VariantArrayBuilder,
197+
variant_builder: VariantBuilder,
198+
}
199+
200+
impl DirectVariantBuilder<'_> {
201+
/// Return a reference to the underlying `VariantBuilder`
202+
pub fn inner(&self) -> &VariantBuilder {
203+
&self.variant_builder
204+
}
205+
206+
/// Return a mutable reference to the underlying `VariantBuilder`
207+
pub fn inner_mut(&mut self) -> &mut VariantBuilder {
208+
&mut self.variant_builder
209+
}
210+
211+
/// Called to finalize the variant and write it to the underlying buffers
212+
///
213+
/// Note if you do not call finish, the struct will be reset and the buffers
214+
/// will not be updated.
215+
///
216+
pub fn finish(mut self) {
217+
let metadata_offset = self.metadata_offset;
218+
let value_offset = self.value_offset;
219+
220+
// get the buffers back
221+
let (metadata_buffer, value_buffer) = std::mem::take(&mut self.variant_builder).finish();
222+
let metadata_len = metadata_buffer
223+
.len()
224+
.checked_sub(metadata_offset)
225+
.expect("metadata length decreased unexpectedly");
226+
let value_len = value_buffer
227+
.len()
228+
.checked_sub(value_offset)
229+
.expect("value length decreased unexpectedly");
230+
231+
// put the buffers back
232+
self.array_builder.metadata_buffer = metadata_buffer;
233+
self.array_builder.value_buffer = value_buffer;
234+
235+
// Append offsets and lengths for new nulls into the array builder
236+
self.array_builder
237+
.metadata_locations
238+
.push((metadata_offset, metadata_len));
239+
self.array_builder
240+
.value_locations
241+
.push((value_offset, value_len));
242+
self.array_builder.nulls.append_non_null();
243+
self.finished = true;
244+
}
245+
}
246+
247+
impl<'a> Drop for DirectVariantBuilder<'a> {
248+
fn drop(&mut self) {
249+
if self.finished {
250+
// if the object was finished, we do not need to do anything
251+
return;
252+
}
253+
// if the object was not finished, we need to reset any partial state put the buffers back
254+
println!("DirectVariantBuilder::drop");
255+
let variant_builder = std::mem::take(&mut self.variant_builder);
256+
let (mut metadata_buffer, mut value_buffer) = variant_builder.finish();
257+
assert!(
258+
metadata_buffer.len() >= self.metadata_offset,
259+
"metadata got smaller"
260+
);
261+
assert!(value_buffer.len() >= self.value_offset, "value got smaller");
262+
metadata_buffer.truncate(self.metadata_offset);
263+
value_buffer.truncate(self.value_offset);
264+
// put the buffers back
265+
self.array_builder.metadata_buffer = metadata_buffer;
266+
self.array_builder.value_buffer = value_buffer;
267+
}
172268
}
173269

174270
fn binary_view_array_from_buffers(

0 commit comments

Comments
 (0)