Skip to content

Commit dc5d138

Browse files
committed
Add variant_get compute kernel
In parquet-variant: - Add a new function `Variant::get_path`: this traverses the path to create a new Variant (does not cast any of it). - Add a new module `parquet_variant::path`: adds structs/enums to define a path to access a variant value deeply. In parquet-variant-compute: - Add a new compute kernel `variant_get`: does the path traversal over a `VariantArray`. In the future, this would also cast the values to a specified type. - Includes some basic unit tests. - Includes a simple micro-benchmark for reference. Current limitations: - It can only return another VariantArray. Casts are not implemented yet. - Only top-level object/list access is supported. It panics on finding a nested object/list. Needs apache#7914 to fix this. - Perf is a TODO.
1 parent daf31be commit dc5d138

File tree

7 files changed

+362
-0
lines changed

7 files changed

+362
-0
lines changed

parquet-variant-compute/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,9 @@ name = "parquet_variant_compute"
4242
bench = false
4343

4444
[dev-dependencies]
45+
criterion = { version = "0.6", default-features = false }
46+
rand = { version = "0.9.1" }
47+
48+
[[bench]]
49+
name = "variant_get"
50+
harness = false
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
use std::sync::Arc;
18+
19+
use arrow::array::ArrayRef;
20+
use criterion::{criterion_group, criterion_main, Criterion};
21+
use parquet_variant::{Variant, VariantBuilder};
22+
use parquet_variant_compute::{
23+
variant_get::{variant_get, GetOptions},
24+
VariantArray, VariantArrayBuilder,
25+
};
26+
use rand::{rngs::StdRng, Rng, SeedableRng};
27+
28+
fn create_primitive_variant(size: usize) -> VariantArray {
29+
let mut rng = StdRng::seed_from_u64(42);
30+
31+
let mut variant_builder = VariantArrayBuilder::new(1);
32+
33+
for _ in 0..size {
34+
let mut builder = VariantBuilder::new();
35+
builder.append_value(rng.random::<i64>());
36+
let (metadata, value) = builder.finish();
37+
variant_builder.append_variant(Variant::try_new(&metadata, &value).unwrap());
38+
}
39+
40+
variant_builder.build()
41+
}
42+
43+
pub fn variant_get_bench(c: &mut Criterion) {
44+
let variant_array = create_primitive_variant(8192);
45+
let input: ArrayRef = Arc::new(variant_array);
46+
47+
let options = GetOptions {
48+
path: vec![].into(),
49+
as_type: None,
50+
cast_options: Default::default(),
51+
};
52+
53+
c.bench_function("variant_get_primitive", |b| {
54+
b.iter(|| variant_get(&input.clone(), options.clone()))
55+
});
56+
}
57+
58+
criterion_group!(benches, variant_get_bench);
59+
criterion_main!(benches);

parquet-variant-compute/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod from_json;
1919
mod to_json;
2020
mod variant_array;
2121
mod variant_array_builder;
22+
pub mod variant_get;
2223

2324
pub use variant_array::VariantArray;
2425
pub use variant_array_builder::VariantArrayBuilder;
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
use std::sync::Arc;
18+
19+
use arrow::{
20+
array::{Array, ArrayRef},
21+
compute::CastOptions,
22+
error::Result,
23+
};
24+
use arrow_schema::{ArrowError, Field};
25+
use parquet_variant::path::VariantPath;
26+
27+
use crate::{VariantArray, VariantArrayBuilder};
28+
29+
/// Returns an array with the specified path extracted from the variant values.
30+
///
31+
/// The return array type depends on the `as_type` field of the options parameter
32+
/// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point
33+
/// to the specified path.
34+
/// 2. `as_type: Some(<specific field>)`: an array of the specified type is returned.
35+
pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result<ArrayRef> {
36+
let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| {
37+
ArrowError::InvalidArgumentError(
38+
"expected a VariantArray as the input for variant_get".to_owned(),
39+
)
40+
})?;
41+
42+
if let Some(as_type) = options.as_type {
43+
return Err(ArrowError::NotYetImplemented(format!(
44+
"getting a {} from a VariantArray is not implemented yet",
45+
as_type
46+
)));
47+
}
48+
49+
let mut builder = VariantArrayBuilder::new(variant_array.len());
50+
for i in 0..variant_array.len() {
51+
let new_variant = variant_array.value(i);
52+
// TODO: perf?
53+
let new_variant = new_variant.get_path(&options.path);
54+
if let Some(new_variant) = new_variant {
55+
// TODO: we're decoding the value and doing a copy into a variant value again. This
56+
// copy can be much smarter.
57+
builder.append_variant(new_variant);
58+
} else {
59+
builder.append_null();
60+
}
61+
}
62+
63+
Ok(Arc::new(builder.build()))
64+
}
65+
66+
/// Controls the action of the variant_get kernel.
67+
#[derive(Debug, Clone)]
68+
pub struct GetOptions<'a> {
69+
/// What path to extract
70+
pub path: VariantPath,
71+
/// if `as_type` is None, the returned array will itself be a VariantArray.
72+
///
73+
/// if `as_type` is `Some(type)` the field is returned as the specified type if possible. To specify returning
74+
/// a Variant, pass a Field with variant type in the metadata.
75+
pub as_type: Option<Field>,
76+
/// Controls the casting behavior (e.g. error vs substituting null on cast error).
77+
pub cast_options: CastOptions<'a>,
78+
}
79+
80+
impl<'a> GetOptions<'a> {
81+
/// Construct options to get the specified path as a variant.
82+
pub fn new_with_path(path: VariantPath) -> Self {
83+
Self {
84+
path,
85+
as_type: None,
86+
cast_options: Default::default(),
87+
}
88+
}
89+
}
90+
91+
#[cfg(test)]
92+
mod test {
93+
use std::sync::Arc;
94+
95+
use arrow::array::{Array, ArrayRef, StringArray};
96+
use parquet_variant::path::{VariantPath, VariantPathElement};
97+
98+
use crate::batch_json_string_to_variant;
99+
use crate::VariantArray;
100+
101+
use super::{variant_get, GetOptions};
102+
103+
fn single_variant_get_test(input_json: &str, path: VariantPath, expected_json: &str) {
104+
// Create input array from JSON string
105+
let input_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(input_json)]));
106+
let input_variant_array_ref: ArrayRef =
107+
Arc::new(batch_json_string_to_variant(&input_array_ref).unwrap());
108+
109+
let result =
110+
variant_get(&input_variant_array_ref, GetOptions::new_with_path(path)).unwrap();
111+
112+
// Create expected array from JSON string
113+
let expected_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(expected_json)]));
114+
let expected_variant_array = batch_json_string_to_variant(&expected_array_ref).unwrap();
115+
116+
let result_array: &VariantArray = result.as_any().downcast_ref().unwrap();
117+
assert_eq!(
118+
result_array.len(),
119+
1,
120+
"Expected result array to have length 1"
121+
);
122+
assert!(
123+
result_array.nulls().is_none(),
124+
"Expected no nulls in result array"
125+
);
126+
let result_variant = result_array.value(0);
127+
let expected_variant = expected_variant_array.value(0);
128+
assert_eq!(
129+
result_variant, expected_variant,
130+
"Result variant does not match expected variant"
131+
);
132+
}
133+
134+
#[test]
135+
fn get_primitive_variant_field() {
136+
single_variant_get_test(
137+
r#"{"some_field": 1234}"#,
138+
vec![VariantPathElement::field("some_field".to_owned())].into(),
139+
"1234",
140+
);
141+
}
142+
143+
#[test]
144+
fn get_primitive_variant_list_index() {
145+
single_variant_get_test(
146+
"[1234, 5678]",
147+
vec![VariantPathElement::index(0)].into(),
148+
"1234",
149+
);
150+
}
151+
152+
#[test]
153+
fn get_primitive_variant_inside_object_of_object() {
154+
single_variant_get_test(
155+
r#"{"top_level_field": {"inner_field": 1234}}"#,
156+
vec![
157+
VariantPathElement::field("top_level_field".to_owned()),
158+
VariantPathElement::field("inner_field".to_owned()),
159+
]
160+
.into(),
161+
"1234",
162+
);
163+
}
164+
165+
#[test]
166+
fn get_primitive_variant_inside_list_of_object() {
167+
single_variant_get_test(
168+
r#"[{"some_field": 1234}]"#,
169+
vec![
170+
VariantPathElement::index(0),
171+
VariantPathElement::field("some_field".to_owned()),
172+
]
173+
.into(),
174+
"1234",
175+
);
176+
}
177+
178+
#[test]
179+
fn get_primitive_variant_inside_object_of_list() {
180+
single_variant_get_test(
181+
r#"{"some_field": [1234]}"#,
182+
vec![
183+
VariantPathElement::field("some_field".to_owned()),
184+
VariantPathElement::index(0),
185+
]
186+
.into(),
187+
"1234",
188+
);
189+
}
190+
191+
#[test]
192+
#[should_panic(
193+
expected = "Nested values are handled specially by ObjectBuilder and ListBuilder"
194+
)]
195+
fn get_complex_variant() {
196+
single_variant_get_test(
197+
r#"{"top_level_field": {"inner_field": 1234}}"#,
198+
vec![VariantPathElement::field("top_level_field".to_owned())].into(),
199+
r#"{"inner_field": 1234}"#,
200+
);
201+
}
202+
}

parquet-variant/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
3030
mod builder;
3131
mod decoder;
32+
pub mod path;
3233
mod utils;
3334
mod variant;
3435

parquet-variant/src/path.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
use std::ops::Deref;
18+
19+
/// Represents a qualified path to a potential subfield or index of a variant value.
20+
#[derive(Debug, Clone)]
21+
pub struct VariantPath(Vec<VariantPathElement>);
22+
23+
impl VariantPath {
24+
pub fn new(path: Vec<VariantPathElement>) -> Self {
25+
Self(path)
26+
}
27+
28+
pub fn path(&self) -> &Vec<VariantPathElement> {
29+
&self.0
30+
}
31+
}
32+
33+
impl From<Vec<VariantPathElement>> for VariantPath {
34+
fn from(value: Vec<VariantPathElement>) -> Self {
35+
Self::new(value)
36+
}
37+
}
38+
39+
impl Deref for VariantPath {
40+
type Target = Vec<VariantPathElement>;
41+
42+
fn deref(&self) -> &Self::Target {
43+
&self.0
44+
}
45+
}
46+
47+
/// Element of a path
48+
#[derive(Debug, Clone)]
49+
pub enum VariantPathElement {
50+
/// Access field with name `name`
51+
Field { name: String },
52+
/// Access the list element at `index`
53+
Index { index: usize },
54+
}
55+
56+
impl VariantPathElement {
57+
pub fn field(name: String) -> VariantPathElement {
58+
VariantPathElement::Field { name }
59+
}
60+
61+
pub fn index(index: usize) -> VariantPathElement {
62+
VariantPathElement::Index { index }
63+
}
64+
}

parquet-variant/src/variant.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub use self::object::VariantObject;
2222
use crate::decoder::{
2323
self, get_basic_type, get_primitive_type, VariantBasicType, VariantPrimitiveType,
2424
};
25+
use crate::path::{VariantPath, VariantPathElement};
2526
use crate::utils::{first_byte_from_slice, slice_from_slice};
2627
use std::ops::Deref;
2728

@@ -1063,6 +1064,34 @@ impl<'m, 'v> Variant<'m, 'v> {
10631064
_ => None,
10641065
}
10651066
}
1067+
1068+
/// Return a new Variant with the path followed.
1069+
///
1070+
/// If the path is not found, `None` is returned.
1071+
pub fn get_path(&self, path: &VariantPath) -> Option<Variant> {
1072+
let mut output = self.clone();
1073+
for element in path.deref() {
1074+
match element {
1075+
VariantPathElement::Field { name } => {
1076+
let Variant::Object(variant_object) = output else {
1077+
return None;
1078+
};
1079+
let field = variant_object.get(name);
1080+
let field = field?;
1081+
output = field;
1082+
}
1083+
VariantPathElement::Index { index } => {
1084+
let Variant::List(variant_list) = output else {
1085+
return None;
1086+
};
1087+
let index = variant_list.get(*index);
1088+
let index = index?;
1089+
output = index;
1090+
}
1091+
}
1092+
}
1093+
Some(output)
1094+
}
10661095
}
10671096

10681097
impl From<()> for Variant<'_, '_> {

0 commit comments

Comments
 (0)