Skip to content

Commit 0def286

Browse files
committed
fix arrays
1 parent 23e3425 commit 0def286

File tree

1 file changed

+39
-11
lines changed

1 file changed

+39
-11
lines changed

datafusion/common/src/pruning.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow::array::{Array, UInt64Array};
18+
use arrow::array::{Array, NullArray, UInt64Array};
1919
use arrow::array::{ArrayRef, BooleanArray};
2020
use arrow::datatypes::{FieldRef, Schema, SchemaRef};
2121
use std::collections::HashSet;
@@ -173,7 +173,10 @@ impl PartitionPruningStatistics {
173173
let num_containers = partition_values.len();
174174
let partition_schema = Arc::new(Schema::new(partition_fields));
175175
let mut partition_values_by_column =
176-
vec![vec![]; partition_schema.fields().len()];
176+
vec![
177+
Vec::with_capacity(partition_values.len());
178+
partition_schema.fields().len()
179+
];
177180
for partition_value in partition_values {
178181
for (i, value) in partition_value.into_iter().enumerate() {
179182
partition_values_by_column[i].push(value);
@@ -182,7 +185,13 @@ impl PartitionPruningStatistics {
182185
Ok(Self {
183186
partition_values: partition_values_by_column
184187
.into_iter()
185-
.map(|v| ScalarValue::iter_to_array(v))
188+
.map(|v| {
189+
if v.is_empty() {
190+
Ok(Arc::new(NullArray::new(0)) as ArrayRef)
191+
} else {
192+
ScalarValue::iter_to_array(v)
193+
}
194+
})
186195
.collect::<Result<Vec<_>, _>>()?,
187196
num_containers,
188197
partition_schema,
@@ -193,7 +202,18 @@ impl PartitionPruningStatistics {
193202
impl PruningStatistics for PartitionPruningStatistics {
194203
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
195204
let index = self.partition_schema.index_of(column.name()).ok()?;
196-
self.partition_values.get(index).map(|v| Arc::clone(v))
205+
self.partition_values
206+
.get(index)
207+
.map(|v| {
208+
if v.is_empty() || v.null_count() == v.len() {
209+
// If the array is empty or all nulls, return None
210+
None
211+
} else {
212+
// Otherwise, return the array as is
213+
Some(Arc::clone(v))
214+
}
215+
})
216+
.flatten()
197217
}
198218

199219
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
@@ -219,10 +239,20 @@ impl PruningStatistics for PartitionPruningStatistics {
219239
) -> Option<BooleanArray> {
220240
let index = self.partition_schema.index_of(column.name()).ok()?;
221241
let array = self.partition_values.get(index)?;
222-
let values_array = ScalarValue::iter_to_array(values.iter().cloned()).ok()?;
223-
let boolean_array =
224-
arrow::compute::kernels::cmp::eq(array, &values_array).ok()?;
225-
if boolean_array.null_count() == boolean_array.len() {
242+
let boolean_arrays = values
243+
.iter()
244+
.map(|v| {
245+
let arrow_value = v.to_scalar()?;
246+
arrow::compute::kernels::cmp::eq(array, &arrow_value)
247+
})
248+
.collect::<Result<Vec<_>, _>>()
249+
.ok()?;
250+
let boolean_array = boolean_arrays.into_iter().reduce(|acc, arr| {
251+
arrow::compute::kernels::boolean::and(&acc, &arr)
252+
.expect("arrays are known to have equal lengths")
253+
})?;
254+
// If the boolean array is empty or all null values, return None
255+
if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() {
226256
None
227257
} else {
228258
Some(boolean_array)
@@ -549,9 +579,7 @@ mod tests {
549579

550580
// Contained values are all empty
551581
let values = HashSet::from([ScalarValue::from(1i32)]);
552-
let contained_a = partition_stats.contained(&column_a, &values);
553-
let expected_contained_a = BooleanArray::from(Vec::<Option<bool>>::new());
554-
assert_eq!(contained_a, Some(expected_contained_a));
582+
assert!(partition_stats.contained(&column_a, &values).is_none());
555583
}
556584

557585
#[test]

0 commit comments

Comments
 (0)