Skip to content

WIP Improve planning Examples #10953

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ cargo run --example csv_sql
- [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics
- [`avro_sql.rs`](examples/avro_sql.rs): Build and run a query plan from a SQL statement against a local AVRO file
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
- [`csv_sql.rs`](examples/csv_sql.rs): Build and run a query plan from a SQL statement against a local CSV file
Expand All @@ -61,20 +62,21 @@ cargo run --example csv_sql
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
- ['parquet_index.rs'](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to use a special operator
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from Datafusion `Expr` and `LogicalPlan`
- [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics
- [`pruning.rs`](examples/parquet_sql.rs): Use a custom catalog and a PruningPredicate to prune files with a predicate and statistics
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions
- [`rewrite_expr.rs`](examples/rewrite_expr.rs): Define and invoke a custom Query Optimizer pass
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`
- [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings
- [`sql_dialect.rs`](examples/sql_dialect.rs): Implementing a custom SQL dialect on top of `DFParser`
- [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function
- [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions

Expand Down
128 changes: 128 additions & 0 deletions datafusion-examples/examples/analyzer_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::prelude::SessionContext;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{lit, Expr, LogicalPlan};
use datafusion_optimizer::analyzer::AnalyzerRule;
use std::sync::Arc;

/// This example demonstrates how to add your own [`AnalyzerRule`]
/// to DataFusion.
///
/// [`AnalyzerRule`]s transform [`LogicalPlan`]s prior to the rest of the
/// DataFusion optimization process, and are allowed to change the plan's
/// semantics (e.g. output types).
///
/// See [optimizer_rule.rs] for an example of a optimizer rule
#[tokio::main]
pub async fn main() -> Result<()> {
// DataFusion includes several built in AnalyzerRules for tasks such as type
// coercion. To modify the list of rules, we must use the lower level
// SessionState API
let state = SessionContext::new().state();
let state = state.add_analyzer_rule(Arc::new(MyAnalyzerRule {}));

// To plan and run queries with the new rule, create a SessionContext with
// the modified SessionState
let ctx = SessionContext::from(state);
ctx.register_batch("person", person_batch())?;

// Plan a SQL statement as normal
let sql = "SELECT * FROM person WHERE age BETWEEN 21 AND 32";
let plan = ctx.sql(sql).await?.into_optimized_plan()?;

println!("Logical Plan:\n\n{}\n", plan.display_indent());

// We can see the effect of our rewrite on the output plan. Even though the
// input query was between 21 and 32, the plan is between 31 and 42

// Filter: person.age >= Int32(31) AND person.age <= Int32(42)
// TableScan: person projection=[name, age]

ctx.sql(sql).await?.show().await?;

// And the output verifies the predicates have been changed

// +-------+-----+
// | name | age |
// +-------+-----+
// | Oleks | 33 |
// +-------+-----+

Ok(())
}

/// An example analyzer rule that changes adds 10 to all Int64 literals in the plan
struct MyAnalyzerRule {}

impl AnalyzerRule for MyAnalyzerRule {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
// use the TreeNode API to recursively walk the LogicalPlan tree
// and all of its children (inputs)
plan.transform(|plan| {
// This closure is called for each LogicalPlan node
plan.map_expressions(|expr| {
// This closure is called for all expressions in the current plan
//
// For example, given a plan like `SELECT a + b, 5 + 10`
//
// The closure would be called twice, once for `a + b` and once for `5 + 10`
self.rewrite_expr(expr)
})
})
// the result of calling transform is a `Transformed` structure that
// contains a flag signalling if any rewrite took place as well as
// if the recursion stopped early.
//
// This example does not need either of that information, so simply
// extract the LogicalPlan "data"
.data()
}

fn name(&self) -> &str {
"my_analyzer_rule"
}
}

impl MyAnalyzerRule {
/// rewrites an idividual expression
fn rewrite_expr(&self, expr: Expr) -> Result<Transformed<Expr>> {
expr.transform(|expr| {
// closure is invoked for all sub expressions

// Transformed is used to transfer the "was this rewritten"
// information back up the stack.
if let Expr::Literal(ScalarValue::Int64(Some(i))) = expr {
Ok(Transformed::yes(lit(i + 10)))
} else {
Ok(Transformed::no(expr))
}
})
}
}

/// Return a RecordBatch with made up data
fn person_batch() -> RecordBatch {
let name: ArrayRef =
Arc::new(StringArray::from_iter_values(["Andy", "Andrew", "Oleks"]));
let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22, 33]));
RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap()
}
221 changes: 221 additions & 0 deletions datafusion-examples/examples/optimizer_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::DataType;
use datafusion::prelude::SessionContext;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_expr::{
BinaryExpr, ColumnarValue, Expr, LogicalPlan, Operator, ScalarUDF, ScalarUDFImpl,
Signature, Volatility,
};
use datafusion_optimizer::optimizer::ApplyOrder;
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use std::any::Any;
use std::sync::Arc;

/// This example demonstrates how to add your own [`OptimizerRule`]
/// to DataFusion.
///
/// [`OptimizerRule`]s transform [`LogicalPlan`]s into an equivalent (but
/// hopefully faster) form.
///
/// See [analyzer_rule.rs] for an example of AnalyzerRules, which are for
/// changing plan semantics.
#[tokio::main]
pub async fn main() -> Result<()> {
// DataFusion includes many built in OptimizerRules for tasks such as outer
// to inner join conversion and constant folding. To modify the list of
// optimizer rules, we must use the lower level SessionState API
let state = SessionContext::new().state();
let state = state.add_optimizer_rule(Arc::new(MyOptimizerRule {}));

// To plan and run queries with the new rule, create a SessionContext with
// the modified SessionState
let ctx = SessionContext::from(state);
ctx.register_batch("person", person_batch())?;

// Plan a SQL statement as normal
let sql = "SELECT * FROM person WHERE age = 22";
let plan = ctx.sql(sql).await?.into_optimized_plan()?;

println!("Logical Plan:\n\n{}\n", plan.display_indent());

// We can see the effect of our rewrite on the output plan that the filter
// has been rewritten to my_eq

// Filter: my_eq(person.age, Int32(22))
// TableScan: person projection=[name, age]

ctx.sql(sql).await?.show().await?;

// And the output verifies the predicates have been changed (as the my_eq
// always returns true)

// +--------+-----+
// | name | age |
// +--------+-----+
// | Andy | 11 |
// | Andrew | 22 |
// | Oleks | 33 |
// +--------+-----+

// however we can see the rule doesn't trigger for queries with not equal
// predicates
ctx.sql("SELECT * FROM person WHERE age <> 22")
.await?
.show()
.await?;

// +-------+-----+
// | name | age |
// +-------+-----+
// | Andy | 11 |
// | Oleks | 33 |
// +-------+-----+

Ok(())
}

/// An example optimizer rule that looks for col = <const> and replaces it with
/// a user defined function
struct MyOptimizerRule {}

impl OptimizerRule for MyOptimizerRule {
fn name(&self) -> &str {
"my_optimizer_rule"
}

// New OptimizerRules should use the "rewrite" api as it is more efficient
fn supports_rewrite(&self) -> bool {
true
}

/// Ask the optimizer to handle the plan recursion. `rewrite` will be called
/// on each plan node.
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
plan.map_expressions(|expr| {
// This closure is called for all expressions in the current plan
//
// For example, given a plan like `SELECT a + b, 5 + 10`
//
// The closure would be called twice, once for `a + b` and once for `5 + 10`
self.rewrite_expr(expr)
})
}

fn try_optimize(
&self,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// since this rule uses the rewrite API, return an error if the old API is called
return internal_err!("Should have called rewrite");
}
}

impl MyOptimizerRule {
/// Rewrites an Expr replacing all `<col> = <const>` expressions with
/// a call to my_eq udf
fn rewrite_expr(&self, expr: Expr) -> Result<Transformed<Expr>> {
// do a bottom up rewrite of the expression tree
expr.transform_up(|expr| {
// Closure called for each sub tree
match expr {
Expr::BinaryExpr(binary_expr) if is_binary_eq(&binary_expr) => {
// destruture the expression
let BinaryExpr { left, op: _, right } = binary_expr;
// rewrite to `my_eq(left, right)`
let udf = ScalarUDF::new_from_impl(MyEq::new());
let call = udf.call(vec![*left, *right]);
Ok(Transformed::yes(call))
}
_ => return Ok(Transformed::no(expr)),
}
})
// Note that the TreeNode API handles propagating the transformed flag
// and errors up the call chain
}
}

/// return true of the expression is an equality expression for a literal or
/// column reference
fn is_binary_eq(binary_expr: &BinaryExpr) -> bool {
binary_expr.op == Operator::Eq
&& is_lit_or_col(binary_expr.left.as_ref())
&& is_lit_or_col(binary_expr.right.as_ref())
}

/// Return true if the expression is a literal or column reference
fn is_lit_or_col(expr: &Expr) -> bool {
matches!(expr, Expr::Column(_) | Expr::Literal(_))
}

/// A simple user defined filter function
#[derive(Debug, Clone)]
struct MyEq {
signature: Signature,
}

impl MyEq {
fn new() -> Self {
Self {
signature: Signature::any(2, Volatility::Stable),
}
}
}

impl ScalarUDFImpl for MyEq {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"my_eq"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Boolean)
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
// this example simply returns "true" which is not what a real
// implementation would do.
return Ok(ColumnarValue::Scalar(ScalarValue::from(true)));
}
}

/// Return a RecordBatch with made up data
fn person_batch() -> RecordBatch {
let name: ArrayRef =
Arc::new(StringArray::from_iter_values(["Andy", "Andrew", "Oleks"]));
let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22, 33]));
RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap()
}
3 changes: 3 additions & 0 deletions datafusion-examples/examples/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use std::sync::Arc;
/// quickly eliminate entire files / partitions / row groups of data from
/// consideration using statistical information from a catalog or other
/// metadata.
///
/// This example uses a user defined catalog to supply information. See `parquet_index.rs` for
/// an example that extracts the necessary information from Parquet metadata.
#[tokio::main]
async fn main() {
// In this example, we'll use the PruningPredicate to determine if
Expand Down
Loading
Loading