Skip to content

Perf: Optimize in memory sort #15380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ config_namespace! {
/// When sorting, below what size should data be concatenated
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
pub sort_in_place_threshold_bytes: usize, default = 3 * 1024 * 1024

/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32
Expand Down
98 changes: 89 additions & 9 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ use crate::{
Statistics,
};

use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
use arrow::array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, StringViewArray};
use arrow::compute::{
concat, interleave_record_batch, lexsort_to_indices, take_arrays, SortColumn,
};
use arrow::datatypes::SchemaRef;
use datafusion_common::config::SpillCompression;
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};

use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
Expand Down Expand Up @@ -654,16 +657,25 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}

// If less than sort_in_place_threshold_bytes, concatenate and sort in place
// If less than sort_in_place_threshold_bytes, we sort in memory.
if self.reservation.size() < self.sort_in_place_threshold_bytes {
// Concatenate memory batches together and sort
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
let interleave_indices =
self.build_sorted_indices(self.in_mem_batches.as_slice(), &self.expr)?;

let batches: Vec<&RecordBatch> = self.in_mem_batches.iter().collect();
let sorted_batch = interleave_record_batch(&batches, &interleave_indices)?;

self.in_mem_batches.clear();
self.reservation
.try_resize(get_reserved_byte_for_record_batch(&batch))
.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
let reservation = self.reservation.take();
return self.sort_batch_stream(batch, metrics, reservation);

metrics.record_output(sorted_batch.num_rows());

return Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
futures::stream::once(async { Ok(sorted_batch) }),
)) as SendableRecordBatchStream);
}

let streams = std::mem::take(&mut self.in_mem_batches)
Expand All @@ -689,6 +701,74 @@ impl ExternalSorter {
.build()
}

fn build_sorted_indices(
&self,
current_batches: &[RecordBatch],
expr: &LexOrdering,
) -> Result<Vec<(usize, usize)>> {
// ===== Phase 1: Build global sort columns for each sort expression =====
// For each sort expression, evaluate and collect the corresponding sort column from each in-memory batch
// Here, `self.expr` is a list of sort expressions, each providing `evaluate_to_sort_column()`,
// which returns an ArrayRef (in `.values`) and sort options (`options`)

// ```text
// columns_by_expr for example:
// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
// │ ├── ArrayRef_0_1 (from batch_1)
// │ └── ArrayRef_0_2 (from batch_2)
// ├── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
// │ ├── ArrayRef_1_1 (from batch_1)
// │ └── ArrayRef_1_2 (from batch_2)
// ```
let mut columns_by_expr: Vec<Vec<ArrayRef>> = expr
.iter()
.map(|_| Vec::with_capacity(current_batches.len()))
.collect();

for batch in current_batches {
for (i, e) in expr.iter().enumerate() {
let col = e.evaluate_to_sort_column(batch)?.values;
columns_by_expr[i].push(col);
}
}

// For each sort expression, concatenate arrays from all batches into one global array
let mut sort_columns = Vec::with_capacity(expr.len());
for (arrays, e) in columns_by_expr.into_iter().zip(expr.iter()) {
let array = concat(
&arrays
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>(),
)?;
sort_columns.push(SortColumn {
values: array,
options: e.options.into(),
});
}

// ===== Phase 2: Compute global sorted indices =====
// Use `lexsort_to_indices` to get global row indices in sorted order (as if all batches were concatenated)
let indices = lexsort_to_indices(&sort_columns, None)?;

// Phase 3: Prepare indices for interleaving
let batch_indices: Vec<(usize, usize)> = current_batches
.iter()
.enumerate()
.flat_map(|(batch_id, batch)| {
(0..batch.num_rows()).map(move |i| (batch_id, i))
})
.collect();

let interleave_indices: Vec<(usize, usize)> = indices
.values()
.iter()
.map(|x| batch_indices[*x as usize])
.collect();

Ok(interleave_indices)
}

/// Sorts a single `RecordBatch` into a single stream.
///
/// `reservation` accounts for the memory used by this batch and
Expand Down Expand Up @@ -1301,7 +1381,7 @@ mod tests {
use crate::test::TestMemoryExec;

use arrow::array::*;
use arrow::compute::SortOptions;
use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::test_util::batches_to_string;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000
datafusion.execution.skip_physical_aggregate_schema_check false
datafusion.execution.soft_max_rows_per_output_file 50000000
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_in_place_threshold_bytes 3145728
datafusion.execution.sort_spill_reservation_bytes 10485760
datafusion.execution.spill_compression uncompressed
datafusion.execution.split_file_groups_by_statistics false
Expand Down Expand Up @@ -374,7 +374,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregat
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode
datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
datafusion.execution.sort_in_place_threshold_bytes 3145728 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
datafusion.execution.spill_compression uncompressed Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed.
datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. |
| datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. |
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
| datafusion.execution.sort_in_place_threshold_bytes | 3145728 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |
| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. |
| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |
Expand Down