diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 726015d17149..4efb67a37c99 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -722,6 +722,15 @@ config_namespace! { /// then the output will be coerced to a non-view. /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. pub expand_views_at_output: bool, default = false + + /// When DataFusion detects that a plan might not be promply cancellable + /// due to the presence of tight-looping operators, it will attempt to + /// mitigate this by inserting explicit yielding (in as few places as + /// possible to avoid performance degradation). This value represents the + /// yielding period (in batches) at such explicit yielding points. The + /// default value is 64. If set to 0, no DataFusion will not perform + /// any explicit yielding. + pub yield_period: usize, default = 64 } } diff --git a/datafusion/core/tests/execution/infinite_cancel.rs b/datafusion/core/tests/execution/infinite_cancel.rs new file mode 100644 index 000000000000..00c1f6b4486f --- /dev/null +++ b/datafusion/core/tests/execution/infinite_cancel.rs @@ -0,0 +1,820 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::error::Error; +use std::fmt::Formatter; +use std::sync::Arc; + +use arrow::array::{Array, Int64Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; +use arrow_schema::SortOptions; +use datafusion::functions_aggregate::sum; +use datafusion::physical_expr::aggregate::AggregateExprBuilder; +use datafusion::physical_expr::Partitioning; +use datafusion::physical_plan; +use datafusion::physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, +}; +use datafusion::physical_plan::execution_plan::Boundedness; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use datafusion_common::config::ConfigOptions; +use datafusion_common::{JoinType, ScalarValue}; +use datafusion_expr_common::operator::Operator::Gt; +use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +use datafusion_physical_optimizer::insert_yield_exec::InsertYieldExec; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::union::InterleaveExec; + +use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; +use futures::StreamExt; +use parking_lot::RwLock; +use rstest::rstest; +use tokio::select; + +#[derive(Debug)] +/// A batch generator that can produce either bounded or boundless infinite stream of the same RecordBatch. +struct InfiniteGenerator { + /// The RecordBatch to return on each call. + batch: RecordBatch, + /// How many batches have already been generated. + counter: usize, +} + +impl std::fmt::Display for InfiniteGenerator { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + // Display current counter + write!(f, "InfiniteGenerator(counter={})", self.counter) + } +} + +impl LazyBatchGenerator for InfiniteGenerator { + /// Generate the next RecordBatch. + fn generate_next_batch(&mut self) -> datafusion_common::Result> { + // Increment the counter and return a clone of the batch + self.counter += 1; + Ok(Some(self.batch.clone())) + } +} + +/// Build a LazyMemoryExec that yields either a finite or infinite stream depending on `pretend_finite`. +fn make_lazy_exec( + batch: RecordBatch, + schema: SchemaRef, + pretend_finite: bool, +) -> Arc { + let boundedness = if pretend_finite { + Boundedness::Bounded + } else { + Boundedness::Unbounded { + requires_infinite_memory: false, + } + }; + + // Instantiate the generator with the batch and limit + let gen = InfiniteGenerator { batch, counter: 0 }; + + // Wrap the generator in a trait object behind Arc> + let generator: Arc> = Arc::new(RwLock::new(gen)); + + // Create a LazyMemoryExec with one partition using our generator + let mut exec = LazyMemoryExec::try_new(schema, vec![generator]).unwrap(); + exec.set_boundedness(boundedness); + + // Erase concrete type into a generic ExecutionPlan handle + Arc::new(exec) as Arc +} + +#[rstest] +#[tokio::test] +async fn test_infinite_agg_cancel( + #[values(false, true)] pretend_finite: bool, +) -> Result<(), Box> { + // 1) build session & schema & sample batch + let session_ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(Fields::from(vec![Field::new( + "value", + DataType::Int64, + false, + )]))); + let mut builder = Int64Array::builder(8192); + for v in 0..8192 { + builder.append_value(v); + } + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?; + + // 2) set up the infinite source + aggregation + let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new(vec![], vec![], vec![]), + vec![Arc::new( + AggregateExprBuilder::new( + sum::sum_udaf(), + vec![Arc::new(Column::new_with_schema("value", &schema)?)], + ) + .schema(inf.schema()) + .alias("sum") + .build()?, + )], + vec![None], + inf, + schema, + )?); + + // 3) optimize the plan with InsertYieldExec to auto-insert Yield + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new().optimize(aggr, &config)?; + + // 4) get the stream + let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; + const TIMEOUT: u64 = 1; + + // 5) drive the stream inline in select! + let result = select! { + batch_opt = stream.next() => batch_opt, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { + None + } + }; + + assert!(result.is_none(), "Expected timeout, but got a result"); + Ok(()) +} + +#[rstest] +#[tokio::test] +async fn test_infinite_sort_cancel( + #[values(false, true)] pretend_finite: bool, +) -> Result<(), Box> { + // 1) build session & schema & sample batch + let session_ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + let mut builder = Int64Array::builder(8192); + for v in 0..8192 { + builder.append_value(v); + } + let array = builder.finish(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)])?; + + // 2) set up the infinite source + let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); + + // 3) set up a SortExec that will never finish because input is infinite + let sort_options = SortOptions { + descending: false, + nulls_first: true, + }; + let sort_expr = PhysicalSortExpr::new( + Arc::new(Column::new_with_schema("value", &schema)?), + sort_options, + ); + let sort_exec = Arc::new(SortExec::new([sort_expr].into(), inf)); + + // 4) optimize the plan with InsertYieldExec to auto-insert Yield + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new().optimize(sort_exec, &config)?; + + // 5) get the stream + let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; + const TIMEOUT: u64 = 1; + + // 6) drive the stream inline in select! + let result = select! { + batch_opt = stream.next() => batch_opt, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { + None + } + }; + + assert!( + result.is_none(), + "Expected timeout for sort, but got a result" + ); + Ok(()) +} + +#[rstest] +#[tokio::test] +async fn test_infinite_interleave_cancel( + #[values(false, true)] pretend_finite: bool, +) -> Result<(), Box> { + // 1) Build a session and a schema with one i64 column. + let session_ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(Fields::from(vec![Field::new( + "value", + DataType::Int64, + false, + )]))); + let mut builder = Int64Array::builder(8192); + for v in 0..8192 { + builder.append_value(v); + } + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?; + + // 2) Create multiple infinite sources, each filtered by a different threshold. + // This ensures InterleaveExec has many children. + let mut infinite_children = vec![]; + // Use 32 distinct thresholds (each > 0 and < 8192) for 32 infinite inputs. + let thresholds = (0..32).map(|i| 8191 - (i * 256) as i64); + + for thr in thresholds { + // 2a) Set up the infinite source + let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); + + // 2b) Apply a FilterExec with predicate "value > thr". + let filter_expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("value", &schema)?), + Gt, + Arc::new(Literal::new(ScalarValue::Int64(Some(thr)))), + )); + let filtered = Arc::new(FilterExec::try_new(filter_expr, inf)?); + + // 2c) Wrap the filtered stream in CoalesceBatchesExec so it emits + // one 8192-row batch at a time. + let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8192)); + + // 2d) Repartition each coalesced stream by hashing on "value" into 1 partition. + // Required for InterleaveExec::try_new to succeed. + let exprs = vec![Arc::new(Column::new_with_schema("value", &schema)?) as _]; + let partitioning = Partitioning::Hash(exprs, 1); + let hashed = Arc::new(RepartitionExec::try_new(coalesced, partitioning)?); + + infinite_children.push(hashed as Arc); + } + + // 3) Build an InterleaveExec over all infinite children. + let interleave = Arc::new(InterleaveExec::try_new(infinite_children)?); + + // 4) Wrap the InterleaveExec in a FilterExec that always returns false, + // ensuring that no rows are ever emitted. + let always_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false)))); + let filtered_interleave = Arc::new(FilterExec::try_new(always_false, interleave)?); + + // 5) Coalesce the filtered interleave into 8192-row batches. + // This lets InsertYieldExec insert YieldStreamExec at each batch boundary. + let coalesced_top = Arc::new(CoalesceBatchesExec::new(filtered_interleave, 8192)); + + // 6) Apply InsertYieldExec to insert YieldStreamExec under every leaf. + // Each InfiniteExec → FilterExec → CoalesceBatchesExec chain will yield periodically. + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new().optimize(coalesced_top, &config)?; + + // 7) Execute the optimized plan with a 1-second timeout. + // Because the top-level FilterExec always discards rows and the inputs are infinite, + // no batch will be returned within 1 second, causing result to be None. + let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; + const TIMEOUT: u64 = 1; + let result = select! { + batch_opt = stream.next() => batch_opt, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => None, + }; + + assert!( + result.is_none(), + "Expected no output for infinite interleave aggregate, but got a batch" + ); + Ok(()) +} + +#[rstest] +#[tokio::test] +async fn test_infinite_interleave_agg_cancel( + #[values(false, true)] pretend_finite: bool, +) -> Result<(), Box> { + // 1) Build session, schema, and a sample batch. + let session_ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(Fields::from(vec![Field::new( + "value", + DataType::Int64, + false, + )]))); + let mut builder = Int64Array::builder(8192); + for v in 0..8192 { + builder.append_value(v); + } + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?; + + // 2) Create N infinite sources, each filtered by a different predicate. + // That way, the InterleaveExec will have multiple children. + let mut infinite_children = vec![]; + // Use 32 distinct thresholds (each >0 and <8 192) to force 32 infinite inputs + let thresholds = (0..32).map(|i| 8_192 - 1 - (i * 256) as i64); + + for thr in thresholds { + // 2a) One infinite exec: + let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); + + // 2b) Apply a FilterExec: “value > thr”. + let filter_expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("value", &schema)?), + Gt, + Arc::new(Literal::new(ScalarValue::Int64(Some(thr)))), + )); + let filtered = Arc::new(FilterExec::try_new(filter_expr, inf)?); + + // 2c) Wrap in CoalesceBatchesExec so the upstream yields are batched. + let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8192)); + + // 2d) Now repartition so that all children share identical Hash partitioning + // on “value” into 1 bucket. This is required for InterleaveExec::try_new. + let exprs = vec![Arc::new(Column::new_with_schema("value", &schema)?) as _]; + let partitioning = Partitioning::Hash(exprs, 1); + let hashed = Arc::new(RepartitionExec::try_new(coalesced, partitioning)?); + + infinite_children.push(hashed as _); + } + + // 3) Build an InterleaveExec over all N children. + // Since each child now has Partitioning::Hash([col "value"], 1), InterleaveExec::try_new succeeds. + let interleave = Arc::new(InterleaveExec::try_new(infinite_children)?); + let interleave_schema = interleave.schema(); + + // 4) Build a global AggregateExec that sums “value” over all rows. + // Because we use `AggregateMode::Single` with no GROUP BY columns, this plan will + // only produce one “final” row once all inputs finish. But our inputs never finish, + // so we should never get any output. + let aggregate_expr = AggregateExprBuilder::new( + sum::sum_udaf(), + vec![Arc::new(Column::new_with_schema("value", &schema)?)], + ) + .schema(interleave_schema.clone()) + .alias("total") + .build()?; + + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new( + vec![], // no GROUP BY columns + vec![], // no GROUP BY expressions + vec![], // no GROUP BY physical expressions + ), + vec![Arc::new(aggregate_expr)], + vec![None], // no “distinct” flags + interleave, + interleave_schema, + )?); + + // 5) InsertYieldExec will automatically insert YieldStreams beneath each “infinite” leaf. + // That way, each InfiniteExec (through the FilterExec/CoalesceBatchesExec/RepartitionExec chain) + // yields to the runtime periodically instead of spinning CPU. + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new().optimize(aggr, &config)?; + + // 6) Execute the stream. Because AggregateExec(mode=Single) only emits a final batch + // after all inputs finish—and those inputs are infinite—we expect no output + // within 1 second (timeout → None). + let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; + const TIMEOUT: u64 = 1; + let result = select! { + batch_opt = stream.next() => batch_opt, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { + None + } + }; + + assert!( + result.is_none(), + "Expected no output for aggregate over infinite interleave, but got some batch" + ); + + Ok(()) +} + +#[rstest] +#[tokio::test] +async fn test_infinite_join_cancel( + #[values(false, true)] pretend_finite: bool, +) -> Result<(), Box> { + // 1) Session, schema, and a single 8 K‐row batch for each side + let session_ctx = SessionContext::new(); + let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + + let mut builder_left = Int64Array::builder(8_192); + let mut builder_right = Int64Array::builder(8_192); + for v in 0..8_192 { + builder_left.append_value(v); + // on the right side, we’ll shift each value by +1 so that not everything joins, + // but plenty of matching keys exist (e.g. 0 on left matches 1 on right, etc.) + builder_right.append_value(v + 1); + } + let batch_left = + RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_left.finish())])?; + let batch_right = + RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_right.finish())])?; + + // 2a) Build two InfiniteExecs (left and right) + let infinite_left = + make_lazy_exec(batch_left.clone(), schema.clone(), pretend_finite); + let infinite_right = + make_lazy_exec(batch_right.clone(), schema.clone(), pretend_finite); + + // 2b) Create Join keys → join on “value” = “value” + let left_keys: Vec> = + vec![Arc::new(Column::new_with_schema("value", &schema)?)]; + let right_keys: Vec> = + vec![Arc::new(Column::new_with_schema("value", &schema)?)]; + + // 2c) Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition + let coalesced_left = Arc::new(CoalesceBatchesExec::new(infinite_left, 8_192)); + let coalesced_right = Arc::new(CoalesceBatchesExec::new(infinite_right, 8_192)); + + let part_left = Partitioning::Hash(left_keys, 1); + let part_right = Partitioning::Hash(right_keys, 1); + + let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?); + let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?); + + // 2d) Build an Inner HashJoinExec → left.value = right.value + let join = Arc::new(HashJoinExec::try_new( + hashed_left, + hashed_right, + vec![( + Arc::new(Column::new_with_schema("value", &schema)?), + Arc::new(Column::new_with_schema("value", &schema)?), + )], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + true, + )?); + + // 3) Wrap yields under each infinite leaf + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new().optimize(join, &config)?; + + // 4) Execute + 1 sec timeout + let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; + const TIMEOUT: u64 = 1; + let result = select! { + batch_opt = stream.next() => batch_opt, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { + None + } + }; + assert!( + result.is_none(), + "Expected no output for aggregate over infinite + join, but got a batch" + ); + Ok(()) +} + +#[rstest] +#[tokio::test] +async fn test_infinite_join_agg_cancel( + #[values(false, true)] pretend_finite: bool, +) -> Result<(), Box> { + // 1) Session, schema, and a single 8 K‐row batch for each side + let session_ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + + let mut builder_left = Int64Array::builder(8_192); + let mut builder_right = Int64Array::builder(8_192); + for v in 0..8_192 { + builder_left.append_value(v); + // on the right side, we’ll shift each value by +1 so that not everything joins, + // but plenty of matching keys exist (e.g. 0 on left matches 1 on right, etc.) + builder_right.append_value(v + 1); + } + let batch_left = + RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_left.finish())])?; + let batch_right = + RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_right.finish())])?; + + // 2a) Build two InfiniteExecs (left and right) + let infinite_left = + make_lazy_exec(batch_left.clone(), schema.clone(), pretend_finite); + let infinite_right = + make_lazy_exec(batch_right.clone(), schema.clone(), pretend_finite); + + // 2b) Create Join keys → join on “value” = “value” + let left_keys: Vec> = + vec![Arc::new(Column::new_with_schema("value", &schema)?)]; + let right_keys: Vec> = + vec![Arc::new(Column::new_with_schema("value", &schema)?)]; + + // 2c) Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition + let coalesced_left = Arc::new(CoalesceBatchesExec::new(infinite_left, 8_192)); + let coalesced_right = Arc::new(CoalesceBatchesExec::new(infinite_right, 8_192)); + + let part_left = Partitioning::Hash(left_keys, 1); + let part_right = Partitioning::Hash(right_keys, 1); + + let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?); + let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?); + + // 2d) Build an Inner HashJoinExec → left.value = right.value + let join = Arc::new(HashJoinExec::try_new( + hashed_left, + hashed_right, + vec![( + Arc::new(Column::new_with_schema("value", &schema)?), + Arc::new(Column::new_with_schema("value", &schema)?), + )], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + true, + )?); + + // 3) Project only one column (“value” from the left side) because we just want to sum that + let input_schema = join.schema(); + + let proj_expr = vec![( + Arc::new(Column::new_with_schema("value", &input_schema)?) as _, + "value".to_string(), + )]; + + let projection = Arc::new(ProjectionExec::try_new(proj_expr, join)?); + let projection_schema = projection.schema(); + + let output_fields = vec![Field::new("total", DataType::Int64, true)]; + let output_schema = Arc::new(Schema::new(output_fields)); + + // 4) Global aggregate (Single) over “value” + let aggregate_expr = AggregateExprBuilder::new( + sum::sum_udaf(), + vec![Arc::new(Column::new_with_schema( + "value", + &projection.schema(), + )?)], + ) + .schema(output_schema) + .alias("total") + .build()?; + + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new(vec![], vec![], vec![]), + vec![Arc::new(aggregate_expr)], + vec![None], + projection, + projection_schema, + )?); + + // 5) Wrap yields under each infinite leaf + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new().optimize(aggr, &config)?; + + // 6) Execute + 1 sec timeout + let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; + const TIMEOUT: u64 = 1; + let result = select! { + batch_opt = stream.next() => batch_opt, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { + None + } + }; + assert!( + result.is_none(), + "Expected no output for aggregate over infinite + join, but got a batch" + ); + Ok(()) +} + +#[rstest] +#[tokio::test] +async fn test_filter_reject_all_batches_cancel( + #[values(false, true)] pretend_finite: bool, +) -> Result<(), Box> { + // 1) Create a Session, Schema, and an 8K-row RecordBatch + let session_ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + + // Build a batch with values 0..8191 + let mut builder = Int64Array::builder(8_192); + for v in 0..8_192 { + builder.append_value(v); + } + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?; + + // 2a) Wrap this batch in an InfiniteExec + let infinite = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); + + // 2b) Construct a FilterExec that is always false: “value > 10000” (no rows pass) + let false_predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("value", &schema)?), + Gt, + Arc::new(Literal::new(ScalarValue::Int64(Some(10_000)))), + )); + let filtered = Arc::new(FilterExec::try_new(false_predicate, infinite)?); + + // 2c) Use CoalesceBatchesExec to guarantee each Filter pull always yields an 8192-row batch + let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8_192)); + + // 3) InsertYieldExec to insert YieldExec—so that the InfiniteExec yields control between batches + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new().optimize(coalesced, &config)?; + + // 4) Execute with a 1-second timeout. Because Filter discards all 8192 rows each time + // without ever producing output, no batch will arrive within 1 second. And since + // emission type is not Final, we never see an end‐of‐stream marker. + let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; + const TIMEOUT: u64 = 1; + let result = select! { + batch_opt = stream.next() => batch_opt, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { + None + } + }; + assert!( + result.is_none(), + "Expected no output for infinite + filter(all-false) + aggregate, but got a batch" + ); + Ok(()) +} + +#[rstest] +#[tokio::test] +async fn test_infinite_hash_join_without_repartition_and_no_agg( + #[values(false, true)] pretend_finite: bool, +) -> Result<(), Box> { + // 1) Create Session, schema, and an 8K-row RecordBatch for each side + let session_ctx = SessionContext::new(); + let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + + let mut builder_left = Int64Array::builder(8_192); + let mut builder_right = Int64Array::builder(8_192); + for v in 0..8_192 { + builder_left.append_value(v); + builder_right.append_value(v + 1); + } + let batch_left = + RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_left.finish())])?; + let batch_right = + RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_right.finish())])?; + + // 2a) Unlike the test with aggregation, keep this as a pure join— + // use InfiniteExec to simulate an infinite stream + let infinite_left = + make_lazy_exec(batch_left.clone(), schema.clone(), pretend_finite); + let infinite_right = + make_lazy_exec(batch_right.clone(), schema.clone(), pretend_finite); + + // 2b) To feed a single batch into the Join, we can still use CoalesceBatchesExec, + // but do NOT wrap it in a RepartitionExec + let coalesced_left = Arc::new(CoalesceBatchesExec::new(infinite_left, 8_192)); + let coalesced_right = Arc::new(CoalesceBatchesExec::new(infinite_right, 8_192)); + + // 2c) Directly feed `coalesced_left` and `coalesced_right` into HashJoinExec. + // Do not use aggregation or repartition. + let join = Arc::new(HashJoinExec::try_new( + coalesced_left, + coalesced_right, + vec![( + Arc::new(Column::new_with_schema("value", &schema)?), + Arc::new(Column::new_with_schema("value", &schema)?), + )], + /* filter */ None, + &JoinType::Inner, + /* output64 */ None, + // Using CollectLeft is fine—just avoid RepartitionExec’s partitioned channels. + PartitionMode::CollectLeft, + /* build_left */ true, + )?); + + // 3) Do not apply InsertYieldExec—since there is no aggregation, InsertYieldExec would + // not insert a 'final' yield wrapper for the Join. If you want to skip InsertYieldExec + // entirely, comment out the next line; however, not calling it is equivalent + // because there is no aggregation so no wrapper is inserted. Here we simply do + // not call InsertYieldExec, ensuring the plan has neither aggregation nor repartition. + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new().optimize(join, &config)?; + + // 4) Execute with a 1 second timeout + let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; + const TIMEOUT_SEC: u64 = 1; + let result = select! { + batch_opt = stream.next() => batch_opt, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT_SEC)) => { + None + } + }; + + assert!( + result.is_none(), + "Expected no output for infinite + filter(all-false) + aggregate, but got a batch" + ); + Ok(()) +} + +#[rstest] +#[tokio::test] +async fn test_infinite_sort_merge_join_without_repartition_and_no_agg( + #[values(false, true)] pretend_finite: bool, +) -> Result<(), Box> { + // 1) Create Session, schema, and two small RecordBatches that never overlap: + // Left = [-3, -2, -1], Right = [0, 1, 2] + let session_ctx = SessionContext::new(); + let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + + let left_array = { + let mut b = Int64Array::builder(3); + b.append_value(-3); + b.append_value(-2); + b.append_value(-1); + Arc::new(b.finish()) as Arc + }; + let right_array = { + let mut b = Int64Array::builder(3); + b.append_value(0); + b.append_value(1); + b.append_value(2); + Arc::new(b.finish()) as Arc + }; + let batch_left = RecordBatch::try_new(schema.clone(), vec![left_array])?; + let batch_right = RecordBatch::try_new(schema.clone(), vec![right_array])?; + + // 2a) Wrap each small batch in an InfiniteExec (pretend_finite toggles finite vs infinite) + let infinite_left = + make_lazy_exec(batch_left.clone(), schema.clone(), pretend_finite); + let infinite_right = + make_lazy_exec(batch_right.clone(), schema.clone(), pretend_finite); + + // 2b) Coalesce each InfiniteExec into a single 3-row batch at a time. + // (Do NOT wrap in RepartitionExec.) + let coalesced_left = Arc::new(CoalesceBatchesExec::new(infinite_left, 3)); + let coalesced_right = Arc::new(CoalesceBatchesExec::new(infinite_right, 3)); + + // 2c) Build a SortMergeJoinExec on “value”. Since left values < 0 and + // right values ≥ 0, they never match. No aggregation or repartition. + // + // We need a Vec for the join key. Any consistent SortOptions works, + // because data is already in ascending order on “value.” + let join = Arc::new(SortMergeJoinExec::try_new( + coalesced_left, + coalesced_right, + vec![(col("value", &schema)?, col("value", &schema)?)], + /* filter */ None, + JoinType::Inner, + vec![SortOptions::new(true, false)], // ascending, nulls last + /* null_equal */ true, + )?); + + // 3) Do not apply InsertYieldExec (no aggregation, no repartition → no built-in yields). + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new().optimize(join, &config)?; + + // 4) Execute with a 1-second timeout. Because both sides are infinite and never match, + // the SortMergeJoin will never produce output within 1s. + let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; + const TIMEOUT_SEC: u64 = 1; + let result = select! { + batch_opt = stream.next() => batch_opt, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT_SEC)) => None, + }; + + assert!( + result.is_none(), + "Expected no output for infinite SortMergeJoin (no repartition & no aggregation), but got a batch", + ); + Ok(()) +} diff --git a/datafusion/core/tests/execution/mod.rs b/datafusion/core/tests/execution/mod.rs index 8169db1a4611..333a695dca8e 100644 --- a/datafusion/core/tests/execution/mod.rs +++ b/datafusion/core/tests/execution/mod.rs @@ -15,4 +15,5 @@ // specific language governing permissions and limitations // under the License. +mod infinite_cancel; mod logical_plan; diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 12f700ce572b..2c90abeb8047 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -179,6 +179,10 @@ impl ExecutionPlan for TestInsertExec { ) -> Result { unimplemented!("TestInsertExec is a stub for testing.") } + + fn with_cooperative_yields(self: Arc) -> Option> { + Some(self) + } } fn make_count_schema() -> SchemaRef { diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index d92d168f8e5f..d89e2ea5fc38 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -246,6 +246,10 @@ impl ExecutionPlan for DataSinkExec { fn metrics(&self) -> Option { self.sink.metrics() } + + fn with_cooperative_yields(self: Arc) -> Option> { + Some(self) + } } /// Create a output record batch with a count diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 30ecc38709f4..29c3c9c3d7ff 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -38,6 +38,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter_pushdown::{ ChildPushdownResult, FilterPushdownPropagation, }; +use datafusion_physical_plan::yield_stream::wrap_yield_stream; /// A source of data, typically a list of files or memory /// @@ -185,6 +186,8 @@ pub struct DataSourceExec { data_source: Arc, /// Cached plan properties such as sort order cache: PlanProperties, + /// Indicates whether to enable cooperative yielding mode. + cooperative: bool, } impl DisplayAs for DataSourceExec { @@ -256,7 +259,13 @@ impl ExecutionPlan for DataSourceExec { partition: usize, context: Arc, ) -> Result { - self.data_source.open(partition, context) + self.data_source + .open(partition, Arc::clone(&context)) + .map(|stream| wrap_yield_stream(stream, &context, self.cooperative)) + } + + fn with_cooperative_yields(self: Arc) -> Option> { + self.cooperative.then_some(self) } fn metrics(&self) -> Option { @@ -289,7 +298,11 @@ impl ExecutionPlan for DataSourceExec { let data_source = self.data_source.with_fetch(limit)?; let cache = self.cache.clone(); - Some(Arc::new(Self { data_source, cache })) + Some(Arc::new(Self { + data_source, + cache, + cooperative: self.cooperative, + })) } fn fetch(&self) -> Option { @@ -337,9 +350,14 @@ impl DataSourceExec { Arc::new(Self::new(Arc::new(data_source))) } + // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`. pub fn new(data_source: Arc) -> Self { let cache = Self::compute_properties(Arc::clone(&data_source)); - Self { data_source, cache } + Self { + data_source, + cache, + cooperative: true, + } } /// Return the source object @@ -365,6 +383,12 @@ impl DataSourceExec { self } + /// Assign yielding mode + pub fn with_cooperative(mut self, cooperative: bool) -> Self { + self.cooperative = cooperative; + self + } + fn compute_properties(data_source: Arc) -> PlanProperties { PlanProperties::new( data_source.eq_properties(), diff --git a/datafusion/physical-optimizer/src/insert_yield_exec.rs b/datafusion/physical-optimizer/src/insert_yield_exec.rs new file mode 100644 index 000000000000..30a01a67cc68 --- /dev/null +++ b/datafusion/physical-optimizer/src/insert_yield_exec.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The `InsertYieldExec` optimizer rule inspects the physical plan to look for +//! tight-looping operators and inserts explicit yielding mechanisms (whether +//! as a separate operator, or via a yielding variant) at leaf nodes to make +//! the plan cancellation friendly. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::Result; +use datafusion_physical_plan::yield_stream::YieldStreamExec; +use datafusion_physical_plan::ExecutionPlan; + +/// `InsertYieldExec` is a [`PhysicalOptimizerRule`] that finds every leaf node in +/// the plan, and replaces it with a variant that cooperatively yields +/// either using the its yielding variant given by `with_cooperative_yields`, +/// or, if none exists, by inserting a [`YieldStreamExec`] operator as a parent. +pub struct InsertYieldExec {} + +impl InsertYieldExec { + pub fn new() -> Self { + Self {} + } +} + +impl Default for InsertYieldExec { + fn default() -> Self { + Self::new() + } +} + +impl Debug for InsertYieldExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InsertYieldExec").finish() + } +} + +impl PhysicalOptimizerRule for InsertYieldExec { + fn name(&self) -> &str { + "insert_yield_exec" + } + + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + // Only activate if user has configured a non-zero yield frequency. + let yield_period = config.optimizer.yield_period; + if yield_period != 0 { + plan.transform_down(|plan| { + if !plan.children().is_empty() { + // Not a leaf, keep recursing down. + return Ok(Transformed::no(plan)); + } + let new_plan = Arc::clone(&plan) + .with_cooperative_yields() + .unwrap_or_else(|| { + // Otherwise, insert a `YieldStreamExec` to enforce periodic yielding. + Arc::new(YieldStreamExec::new(plan, yield_period)) + }); + Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump)) + }) + .map(|t| t.data) + } else { + Ok(plan) + } + } + + fn schema_check(&self) -> bool { + // Wrapping a leaf in YieldStreamExec preserves the schema, so it is safe. + true + } +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 5a43d7118d63..f7b5bd584351 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -30,6 +30,7 @@ pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; pub mod filter_pushdown; +pub mod insert_yield_exec; pub mod join_selection; pub mod limit_pushdown; pub mod limited_distinct_aggregation; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 432ac35ebc23..d5129cea9d4e 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -26,6 +26,7 @@ use crate::combine_partial_final_agg::CombinePartialFinalAggregate; use crate::enforce_distribution::EnforceDistribution; use crate::enforce_sorting::EnforceSorting; use crate::filter_pushdown::FilterPushdown; +use crate::insert_yield_exec::InsertYieldExec; use crate::join_selection::JoinSelection; use crate::limit_pushdown::LimitPushdown; use crate::limited_distinct_aggregation::LimitedDistinctAggregation; @@ -137,6 +138,7 @@ impl PhysicalOptimizer { // are not present, the load of executors such as join or union will be // reduced by narrowing their input tables. Arc::new(ProjectionPushdown::new()), + Arc::new(InsertYieldExec::new()), // The SanityCheckPlan rule checks whether the order and // distribution requirements of each node in the plan // is satisfied. It will also reject non-runnable query diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 36634fbe6d7e..68f9eb22e330 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -173,6 +173,10 @@ impl ExecutionPlan for EmptyExec { None, )) } + + fn with_cooperative_yields(self: Arc) -> Option> { + Some(self) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 73b405b38471..1beaa49ecf66 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -546,6 +546,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { child_pushdown_result, )) } + + /// Returns a version of this plan that cooperates with the runtime via + /// built‐in yielding. If such a version doesn't exist, returns `None`. + /// You do not need to do provide such a version of a custom operator, + /// but DataFusion will utilize it while optimizing the plan if it exists. + fn with_cooperative_yields(self: Arc) -> Option> { + // Conservative default implementation assumes that a leaf does not + // cooperate with yielding. + None + } } /// [`ExecutionPlan`] Invariant Level diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index ba423f958c78..5d63ccdc13a8 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -94,3 +94,4 @@ pub mod udaf { pub mod coalesce; #[cfg(any(test, feature = "bench"))] pub mod test; +pub mod yield_stream; diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index c232970b2188..91af03bf46df 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -24,6 +24,7 @@ use std::task::{Context, Poll}; use crate::execution_plan::{Boundedness, EmissionType}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use crate::yield_stream::wrap_yield_stream; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -147,6 +148,8 @@ pub struct LazyMemoryExec { batch_generators: Vec>>, /// Plan properties cache storing equivalence properties, partitioning, and execution mode cache: PlanProperties, + /// Indicates whether to enable cooperative yielding mode (defaults to `true`). + cooperative: bool, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -167,9 +170,21 @@ impl LazyMemoryExec { schema, batch_generators: generators, cache, + cooperative: true, // Cooperative yielding mode defaults to true metrics: ExecutionPlanMetricsSet::new(), }) } + + /// Set the Yielding mode for the execution plan + /// It defaults to `true`, meaning it will yield back to the runtime for cooperative scheduling. + pub fn with_cooperative_yielding(mut self, cooperative: bool) -> Self { + self.cooperative = cooperative; + self + } + + pub fn set_boundedness(&mut self, boundedness: Boundedness) { + self.cache.boundedness = boundedness; + } } impl fmt::Debug for LazyMemoryExec { @@ -248,7 +263,7 @@ impl ExecutionPlan for LazyMemoryExec { fn execute( &self, partition: usize, - _context: Arc, + context: Arc, ) -> Result { if partition >= self.batch_generators.len() { return internal_err!( @@ -259,11 +274,17 @@ impl ExecutionPlan for LazyMemoryExec { } let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - Ok(Box::pin(LazyMemoryStream { + + let stream = Box::pin(LazyMemoryStream { schema: Arc::clone(&self.schema), generator: Arc::clone(&self.batch_generators[partition]), baseline_metrics, - })) + }); + Ok(wrap_yield_stream(stream, &context, self.cooperative)) + } + + fn with_cooperative_yields(self: Arc) -> Option> { + self.cooperative.then_some(self) } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 46847b2413c0..a5c80438e774 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -22,10 +22,13 @@ use std::sync::Arc; use crate::execution_plan::{Boundedness, EmissionType}; use crate::memory::MemoryStream; -use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; -use arrow::array::{ArrayRef, NullArray}; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use crate::yield_stream::wrap_yield_stream; +use crate::{ + common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, Statistics, +}; + +use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions}; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; @@ -41,6 +44,8 @@ pub struct PlaceholderRowExec { /// Number of partitions partitions: usize, cache: PlanProperties, + /// Indicates whether to enable cooperative yielding mode. + cooperative: bool, } impl PlaceholderRowExec { @@ -52,6 +57,7 @@ impl PlaceholderRowExec { schema, partitions, cache, + cooperative: true, } } @@ -158,11 +164,8 @@ impl ExecutionPlan for PlaceholderRowExec { ); } - Ok(Box::pin(MemoryStream::try_new( - self.data()?, - Arc::clone(&self.schema), - None, - )?)) + MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None) + .map(|ms| wrap_yield_stream(Box::pin(ms), &context, self.cooperative)) } fn statistics(&self) -> Result { @@ -182,6 +185,10 @@ impl ExecutionPlan for PlaceholderRowExec { None, )) } + + fn with_cooperative_yields(self: Arc) -> Option> { + self.cooperative.then_some(self) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 3f2e99b1d003..a3f593a06d72 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -30,6 +30,7 @@ use crate::projection::{ all_alias_free_columns, new_projections_for_columns, update_ordering, ProjectionExec, }; use crate::stream::RecordBatchStreamAdapter; +use crate::yield_stream::wrap_yield_stream; use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use arrow::datatypes::{Schema, SchemaRef}; @@ -68,6 +69,8 @@ pub struct StreamingTableExec { limit: Option, cache: PlanProperties, metrics: ExecutionPlanMetricsSet, + /// Indicates whether to enable cooperative yielding mode. + cooperative: bool, } impl StreamingTableExec { @@ -112,6 +115,7 @@ impl StreamingTableExec { limit, cache, metrics: ExecutionPlanMetricsSet::new(), + cooperative: true, }) } @@ -262,7 +266,7 @@ impl ExecutionPlan for StreamingTableExec { partition: usize, ctx: Arc, ) -> Result { - let stream = self.partitions[partition].execute(ctx); + let stream = self.partitions[partition].execute(Arc::clone(&ctx)); let projected_stream = match self.projection.clone() { Some(projection) => Box::pin(RecordBatchStreamAdapter::new( Arc::clone(&self.projected_schema), @@ -272,16 +276,13 @@ impl ExecutionPlan for StreamingTableExec { )), None => stream, }; + let stream = wrap_yield_stream(projected_stream, &ctx, self.cooperative); + Ok(match self.limit { - None => projected_stream, + None => stream, Some(fetch) => { let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - Box::pin(LimitStream::new( - projected_stream, - 0, - Some(fetch), - baseline_metrics, - )) + Box::pin(LimitStream::new(stream, 0, Some(fetch), baseline_metrics)) } }) } @@ -338,8 +339,13 @@ impl ExecutionPlan for StreamingTableExec { limit, cache: self.cache.clone(), metrics: self.metrics.clone(), + cooperative: self.cooperative, })) } + + fn with_cooperative_yields(self: Arc) -> Option> { + self.cooperative.then_some(self) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index eea1b9958633..a5f094ffaf04 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -22,11 +22,12 @@ use std::sync::{Arc, Mutex}; use crate::execution_plan::{Boundedness, EmissionType}; use crate::memory::MemoryStream; +use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::yield_stream::wrap_yield_stream; use crate::{ - metrics::{ExecutionPlanMetricsSet, MetricsSet}, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, Statistics, }; -use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -106,6 +107,8 @@ pub struct WorkTableExec { metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// Indicates whether to enable cooperative yielding mode. + cooperative: bool, } impl WorkTableExec { @@ -118,6 +121,7 @@ impl WorkTableExec { metrics: ExecutionPlanMetricsSet::new(), work_table: Arc::new(WorkTable::new()), cache, + cooperative: true, } } @@ -138,6 +142,7 @@ impl WorkTableExec { metrics: ExecutionPlanMetricsSet::new(), work_table, cache: self.cache.clone(), + cooperative: self.cooperative, } } @@ -205,7 +210,7 @@ impl ExecutionPlan for WorkTableExec { fn execute( &self, partition: usize, - _context: Arc, + context: Arc, ) -> Result { // WorkTable streams must be the plan base. if partition != 0 { @@ -214,10 +219,13 @@ impl ExecutionPlan for WorkTableExec { ); } let batch = self.work_table.take()?; - Ok(Box::pin( + + let stream = Box::pin( MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)? .with_reservation(batch.reservation), - )) + ); + // Cooperatively yield if asked to do so: + Ok(wrap_yield_stream(stream, &context, self.cooperative)) } fn metrics(&self) -> Option { @@ -231,6 +239,10 @@ impl ExecutionPlan for WorkTableExec { fn partition_statistics(&self, _partition: Option) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + fn with_cooperative_yields(self: Arc) -> Option> { + self.cooperative.then_some(self) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/yield_stream.rs b/datafusion/physical-plan/src/yield_stream.rs new file mode 100644 index 000000000000..0069b4b64d38 --- /dev/null +++ b/datafusion/physical-plan/src/yield_stream.rs @@ -0,0 +1,276 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::execution_plan::CardinalityEffect::{self, Equal}; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, +}; +use arrow::record_batch::RecordBatch; +use arrow_schema::Schema; +use datafusion_common::{internal_err, Result, Statistics}; +use datafusion_execution::TaskContext; + +use futures::{Stream, StreamExt}; + +/// An identity stream that passes batches through as is, but yields control +/// back to the runtime every `period` batches. This stream is useful to +/// construct a mechanism that allows operators that do not directly cooperate +/// with the runtime to check/support cancellation. +pub struct YieldStream { + inner: SendableRecordBatchStream, + batches_processed: usize, + period: usize, +} + +impl YieldStream { + pub fn new(inner: SendableRecordBatchStream, mut period: usize) -> Self { + if period == 0 { + period = usize::MAX; + } + Self { + inner, + batches_processed: 0, + period, + } + } +} + +impl Stream for YieldStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if self.batches_processed >= self.period { + self.batches_processed = 0; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + let value = self.inner.poll_next_unpin(cx); + match value { + Poll::Ready(Some(Ok(_))) => { + self.batches_processed += 1; + } + Poll::Pending => { + self.batches_processed = 0; + } + _ => {} + } + value + } +} + +impl RecordBatchStream for YieldStream { + fn schema(&self) -> Arc { + self.inner.schema() + } +} + +/// This operator wraps any other execution plan and to "adapt" it to cooperate +/// with the runtime by yielding control back to the runtime every `frequency` +/// batches. This is useful for operators that do not natively support yielding +/// control, allowing them to be used in a runtime that requires yielding for +/// cancellation or other purposes. +/// +/// # Note +/// If your ExecutionPlan periodically yields control back to the scheduler +/// implement [`ExecutionPlan::with_cooperative_yields`] to avoid the need for this +/// node. +#[derive(Debug)] +pub struct YieldStreamExec { + /// The child execution plan that this operator "wraps" to make it + /// cooperate with the runtime. + child: Arc, + /// The frequency at which the operator yields control back to the runtime. + frequency: usize, +} + +impl YieldStreamExec { + /// Create a new `YieldStreamExec` operator that wraps the given child + /// execution plan and yields control back to the runtime every `frequency` + /// batches. + pub fn new(child: Arc, frequency: usize) -> Self { + Self { frequency, child } + } + + /// Returns the child execution plan this operator "wraps" to make it + /// cooperate with the runtime. + pub fn input(&self) -> &Arc { + &self.child + } + + /// Returns the period at which the operator yields control back to the + /// runtime. + pub fn yield_period(&self) -> usize { + self.frequency + } +} + +impl DisplayAs for YieldStreamExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "YieldStreamExec frequency={}", self.frequency) + } +} + +impl ExecutionPlan for YieldStreamExec { + fn name(&self) -> &str { + "YieldStreamExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> Arc { + self.child.schema() + } + + fn properties(&self) -> &PlanProperties { + self.child.properties() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + if children.len() != 1 { + return internal_err!("YieldStreamExec requires exactly one child"); + } + Ok(Arc::new(YieldStreamExec::new( + children.swap_remove(0), + self.frequency, + ))) + } + + fn execute( + &self, + partition: usize, + task_ctx: Arc, + ) -> Result { + let child_stream = self.child.execute(partition, task_ctx)?; + let yield_stream = YieldStream::new(child_stream, self.frequency); + Ok(Box::pin(yield_stream)) + } + + fn partition_statistics(&self, partition: Option) -> Result { + self.child.partition_statistics(partition) + } + + fn maintains_input_order(&self) -> Vec { + self.child.maintains_input_order() + } + + fn supports_limit_pushdown(&self) -> bool { + true + } + + fn cardinality_effect(&self) -> CardinalityEffect { + Equal + } +} + +/// Wraps `stream` inside a `YieldStream` depending on the `cooperative` flag. +/// Yielding period is extracted from `context`. +pub fn wrap_yield_stream( + mut stream: SendableRecordBatchStream, + context: &TaskContext, + cooperative: bool, +) -> SendableRecordBatchStream { + if cooperative { + let period = context.session_config().options().optimizer.yield_period; + if period > 0 { + stream = Box::pin(YieldStream::new(stream, period)); + } + } + stream +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::RecordBatchStreamAdapter; + + use arrow_schema::SchemaRef; + + use futures::{stream, StreamExt}; + + // Frequency testing: + // Number of batches to yield before yielding control back to the executor + const YIELD_BATCHES: usize = 64; + + /// Helper: construct a SendableRecordBatchStream containing `n` empty batches + fn make_empty_batches(n: usize) -> SendableRecordBatchStream { + let schema: SchemaRef = Arc::new(Schema::empty()); + let schema_for_stream = Arc::clone(&schema); + + let s = + stream::iter((0..n).map(move |_| { + Ok(RecordBatch::new_empty(Arc::clone(&schema_for_stream))) + })); + + Box::pin(RecordBatchStreamAdapter::new(schema, s)) + } + + #[tokio::test] + async fn yield_less_than_threshold() -> Result<()> { + let count = YIELD_BATCHES - 10; + let inner = make_empty_batches(count); + let out = YieldStream::new(inner, YIELD_BATCHES) + .collect::>() + .await; + assert_eq!(out.len(), count); + Ok(()) + } + + #[tokio::test] + async fn yield_equal_to_threshold() -> Result<()> { + let count = YIELD_BATCHES; + let inner = make_empty_batches(count); + let out = YieldStream::new(inner, YIELD_BATCHES) + .collect::>() + .await; + assert_eq!(out.len(), count); + Ok(()) + } + + #[tokio::test] + async fn yield_more_than_threshold() -> Result<()> { + let count = YIELD_BATCHES + 20; + let inner = make_empty_batches(count); + let out = YieldStream::new(inner, YIELD_BATCHES) + .collect::>() + .await; + assert_eq!(out.len(), count); + Ok(()) + } +} diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 4c8b6c588d94..76cd8c9118a2 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -726,6 +726,7 @@ message PhysicalPlanNode { ParquetSinkExecNode parquet_sink = 29; UnnestExecNode unnest = 30; JsonScanExecNode json_scan = 31; + YieldStreamExecNode yield_stream = 32; } } @@ -1033,6 +1034,11 @@ message AvroScanExecNode { FileScanExecConf base_conf = 1; } +message YieldStreamExecNode { + PhysicalPlanNode input = 1; + uint32 frequency = 2; +} + enum PartitionMode { COLLECT_LEFT = 0; PARTITIONED = 1; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 932422944508..8a62be84ec8f 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -15796,6 +15796,9 @@ impl serde::Serialize for PhysicalPlanNode { physical_plan_node::PhysicalPlanType::JsonScan(v) => { struct_ser.serialize_field("jsonScan", v)?; } + physical_plan_node::PhysicalPlanType::YieldStream(v) => { + struct_ser.serialize_field("yieldStream", v)?; + } } } struct_ser.end() @@ -15854,6 +15857,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "unnest", "json_scan", "jsonScan", + "yield_stream", + "yieldStream", ]; #[allow(clippy::enum_variant_names)] @@ -15888,6 +15893,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { ParquetSink, Unnest, JsonScan, + YieldStream, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -15939,6 +15945,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "parquetSink" | "parquet_sink" => Ok(GeneratedField::ParquetSink), "unnest" => Ok(GeneratedField::Unnest), "jsonScan" | "json_scan" => Ok(GeneratedField::JsonScan), + "yieldStream" | "yield_stream" => Ok(GeneratedField::YieldStream), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16169,6 +16176,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { return Err(serde::de::Error::duplicate_field("jsonScan")); } physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::JsonScan) +; + } + GeneratedField::YieldStream => { + if physical_plan_type__.is_some() { + return Err(serde::de::Error::duplicate_field("yieldStream")); + } + physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::YieldStream) ; } } @@ -22665,3 +22679,113 @@ impl<'de> serde::Deserialize<'de> for WindowNode { deserializer.deserialize_struct("datafusion.WindowNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for YieldStreamExecNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.input.is_some() { + len += 1; + } + if self.frequency != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.YieldStreamExecNode", len)?; + if let Some(v) = self.input.as_ref() { + struct_ser.serialize_field("input", v)?; + } + if self.frequency != 0 { + struct_ser.serialize_field("frequency", &self.frequency)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for YieldStreamExecNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "input", + "frequency", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Input, + Frequency, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "input" => Ok(GeneratedField::Input), + "frequency" => Ok(GeneratedField::Frequency), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = YieldStreamExecNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.YieldStreamExecNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut input__ = None; + let mut frequency__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Input => { + if input__.is_some() { + return Err(serde::de::Error::duplicate_field("input")); + } + input__ = map_.next_value()?; + } + GeneratedField::Frequency => { + if frequency__.is_some() { + return Err(serde::de::Error::duplicate_field("frequency")); + } + frequency__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(YieldStreamExecNode { + input: input__, + frequency: frequency__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.YieldStreamExecNode", FIELDS, GeneratedVisitor) + } +} diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c2f4e93cef6a..3e3a04051f5d 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1048,7 +1048,7 @@ pub mod table_reference { pub struct PhysicalPlanNode { #[prost( oneof = "physical_plan_node::PhysicalPlanType", - tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31" + tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32" )] pub physical_plan_type: ::core::option::Option, } @@ -1118,6 +1118,8 @@ pub mod physical_plan_node { Unnest(::prost::alloc::boxed::Box), #[prost(message, tag = "31")] JsonScan(super::JsonScanExecNode), + #[prost(message, tag = "32")] + YieldStream(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1572,6 +1574,13 @@ pub struct AvroScanExecNode { pub base_conf: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct YieldStreamExecNode { + #[prost(message, optional, boxed, tag = "1")] + pub input: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(uint32, tag = "2")] + pub frequency: u32, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct HashJoinExecNode { #[prost(message, optional, boxed, tag = "1")] pub left: ::core::option::Option<::prost::alloc::boxed::Box>, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 672f5d2bd43a..e2c391d0445f 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -82,6 +82,7 @@ use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMerge use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; +use datafusion::physical_plan::yield_stream::YieldStreamExec; use datafusion::physical_plan::{ ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, }; @@ -309,7 +310,6 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { runtime, extension_codec, ), - #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] PhysicalPlanType::ParquetSink(sink) => self .try_into_parquet_sink_physical_plan( @@ -324,6 +324,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { runtime, extension_codec, ), + PhysicalPlanType::YieldStream(yield_stream) => self + .try_into_yield_stream_physical_plan( + yield_stream, + registry, + runtime, + extension_codec, + ), } } @@ -513,6 +520,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ); } + if let Some(exec) = plan.downcast_ref::() { + return protobuf::PhysicalPlanNode::try_from_yield_stream_exec( + exec, + extension_codec, + ); + } + let mut buf: Vec = vec![]; match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) { Ok(_) => { @@ -1766,6 +1780,21 @@ impl protobuf::PhysicalPlanNode { ))) } + fn try_into_yield_stream_physical_plan( + &self, + field_stream: &protobuf::YieldStreamExecNode, + registry: &dyn FunctionRegistry, + runtime: &RuntimeEnv, + extension_codec: &dyn PhysicalExtensionCodec, + ) -> Result> { + let input = + into_physical_plan(&field_stream.input, registry, runtime, extension_codec)?; + Ok(Arc::new(YieldStreamExec::new( + input, + field_stream.frequency as _, + ))) + } + fn try_from_explain_exec( exec: &ExplainExec, _extension_codec: &dyn PhysicalExtensionCodec, @@ -2745,6 +2774,25 @@ impl protobuf::PhysicalPlanNode { ))), }) } + + fn try_from_yield_stream_exec( + exec: &YieldStreamExec, + extension_codec: &dyn PhysicalExtensionCodec, + ) -> Result { + let input = protobuf::PhysicalPlanNode::try_from_physical_plan( + exec.input().to_owned(), + extension_codec, + )?; + + Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::YieldStream(Box::new( + protobuf::YieldStreamExecNode { + input: Some(Box::new(input)), + frequency: exec.yield_period() as _, + }, + ))), + }) + } } pub trait AsExecutionPlan: Debug + Send + Sync + Clone { diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 2df8a9dfbae4..560648271070 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -242,6 +242,7 @@ physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[W physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after insert_yield_exec SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] @@ -319,6 +320,7 @@ physical_plan after OutputRequirements physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after insert_yield_exec SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -360,6 +362,7 @@ physical_plan after OutputRequirements physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after insert_yield_exec SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 108c844f20b4..dc8b7680d83e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -303,6 +303,7 @@ datafusion.optimizer.repartition_sorts true datafusion.optimizer.repartition_windows true datafusion.optimizer.skip_failed_rules false datafusion.optimizer.top_down_join_key_reordering true +datafusion.optimizer.yield_period 64 datafusion.sql_parser.collect_spans false datafusion.sql_parser.dialect generic datafusion.sql_parser.enable_ident_normalization true @@ -413,6 +414,7 @@ datafusion.optimizer.repartition_sorts true Should DataFusion execute sorts in a datafusion.optimizer.repartition_windows true Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level datafusion.optimizer.skip_failed_rules false When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail datafusion.optimizer.top_down_join_key_reordering true When set to true, the physical plan optimizer will run a top down process to reorder the join keys +datafusion.optimizer.yield_period 64 When DataFusion detects that a plan might not be promply cancellable due to the presence of tight-looping operators, it will attempt to mitigate this by inserting explicit yielding (in as few places as possible to avoid performance degradation). This value represents the yielding period (in batches) at such explicit yielding points. The default value is 64. If set to 0, no DataFusion will not perform any explicit yielding. datafusion.sql_parser.collect_spans false When set to true, the source locations relative to the original SQL query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected and recorded in the logical plan nodes. datafusion.sql_parser.dialect generic Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. datafusion.sql_parser.enable_ident_normalization true When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6d65f54e228d..42282e39e41f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -119,6 +119,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | +| datafusion.optimizer.yield_period | 64 | When DataFusion detects that a plan might not be promply cancellable due to the presence of tight-looping operators, it will attempt to mitigate this by inserting explicit yielding (in as few places as possible to avoid performance degradation). This value represents the yielding period (in batches) at such explicit yielding points. The default value is 64. If set to 0, no DataFusion will not perform any explicit yielding. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |