Skip to content

Split output batches of joins that do not respect batch size #12969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 18, 2024
44 changes: 37 additions & 7 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{
internal_err, plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult,
plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult,
};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_physical_expr::equivalence::add_offset_to_expr;
Expand Down Expand Up @@ -1361,17 +1361,47 @@ pub(crate) fn append_right_indices(
if right_unmatched_indices.is_empty() {
Ok((left_indices, right_indices))
} else {
// `into_builder()` can fail here when there is nothing to be filtered and
// left_indices or right_indices has the same reference to the cached indices.
// In that case, we use a slower alternative.

// the new left indices: left_indices + null array
let Ok(mut new_left_indices_builder) = left_indices.into_builder() else {
return internal_err!("Failed to convert left indices to builder");
};
let mut new_left_indices_builder =
left_indices.into_builder().unwrap_or_else(|left_indices| {
let mut builder = UInt64Builder::with_capacity(
left_indices.len() + right_unmatched_indices.len(),
);
debug_assert_eq!(
left_indices.null_count(),
0,
"expected left indices to have no nulls"
);
builder.append_slice(left_indices.values());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if at this point the left array Arc can be dropped such that the right side can be converted to builder without issue.

builder
});
new_left_indices_builder.append_nulls(right_unmatched_indices.len());
let new_left_indices = UInt64Array::from(new_left_indices_builder.finish());

// the new right indices: right_indices + right_unmatched_indices
let Ok(mut new_right_indices_builder) = right_indices.into_builder() else {
return internal_err!("Failed to convert right indices to builder");
};
let mut new_right_indices_builder = right_indices
.into_builder()
.unwrap_or_else(|right_indices| {
let mut builder = UInt32Builder::with_capacity(
right_indices.len() + right_unmatched_indices.len(),
);
debug_assert_eq!(
right_indices.null_count(),
0,
"expected right indices to have no nulls"
);
builder.append_slice(right_indices.values());
builder
});
debug_assert_eq!(
right_unmatched_indices.null_count(),
0,
"expected right unmatched indices to have no nulls"
);
new_right_indices_builder.append_slice(right_unmatched_indices.values());
let new_right_indices = UInt32Array::from(new_right_indices_builder.finish());

Expand Down
Loading