From 51bc3ba158874dd2fc3061be916b56f4ed7f9907 Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Wed, 21 Aug 2024 16:01:07 +0200 Subject: [PATCH 1/4] fix: UDF, UDAF, UDWF with_alias(..) should wrap the inner function fully --- datafusion/expr/src/udaf.rs | 60 ++++++++++++++++++- datafusion/expr/src/udf.rs | 57 +++++++++++++++++- datafusion/expr/src/udwf.rs | 16 +++++ .../functions-aggregate/src/first_last.rs | 4 +- 4 files changed, 132 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index d136aeaf0908..76699d399f6d 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -442,7 +442,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { /// not implement the method, returns an error. Order insensitive and hard /// requirement aggregators return `Ok(None)`. fn with_beneficial_ordering( - self: Arc, + &self, _beneficial_ordering: bool, ) -> Result>> { if self.order_sensitivity().is_beneficial() { @@ -608,6 +608,60 @@ impl AggregateUDFImpl for AliasedAggregateUDFImpl { &self.aliases } + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + self.inner.state_fields(args) + } + + fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { + self.inner.groups_accumulator_supported(args) + } + + fn create_groups_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + self.inner.create_groups_accumulator(args) + } + + fn create_sliding_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + self.inner.accumulator(args) + } + + fn with_beneficial_ordering( + &self, + beneficial_ordering: bool, + ) -> Result>> { + self.inner + .with_beneficial_ordering(beneficial_ordering) + .map(|udf| { + udf.map(|udf| { + Arc::new(AliasedAggregateUDFImpl { + inner: udf, + aliases: self.aliases.clone(), + }) as Arc + }) + }) + } + + fn order_sensitivity(&self) -> AggregateOrderSensitivity { + self.inner.order_sensitivity() + } + + fn simplify(&self) -> Option { + self.inner.simplify() + } + + fn reverse_expr(&self) -> ReversedUDAF { + self.inner.reverse_expr() + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + self.inner.coerce_types(arg_types) + } + fn equals(&self, other: &dyn AggregateUDFImpl) -> bool { if let Some(other) = other.as_any().downcast_ref::() { self.inner.equals(other.inner.as_ref()) && self.aliases == other.aliases @@ -622,6 +676,10 @@ impl AggregateUDFImpl for AliasedAggregateUDFImpl { self.aliases.hash(hasher); hasher.finish() } + + fn is_descending(&self) -> Option { + self.inner.is_descending() + } } /// Implementation of [`AggregateUDFImpl`] that wraps the function style pointers diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index f5434726e23d..40f2b1e25832 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -624,6 +624,14 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.name() } + fn display_name(&self, args: &[Expr]) -> Result { + self.inner.display_name(args) + } + + fn schema_name(&self, args: &[Expr]) -> Result { + self.inner.schema_name(args) + } + fn signature(&self) -> &Signature { self.inner.signature() } @@ -632,12 +640,57 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.return_type(arg_types) } + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn return_type_from_exprs( + &self, + args: &[Expr], + schema: &dyn ExprSchema, + arg_types: &[DataType], + ) -> Result { + self.inner.return_type_from_exprs(args, schema, arg_types) + } + fn invoke(&self, args: &[ColumnarValue]) -> Result { self.inner.invoke(args) } - fn aliases(&self) -> &[String] { - &self.aliases + fn invoke_no_args(&self, number_rows: usize) -> Result { + self.inner.invoke_no_args(number_rows) + } + + fn simplify( + &self, + args: Vec, + info: &dyn SimplifyInfo, + ) -> Result { + self.inner.simplify(args, info) + } + + fn short_circuits(&self) -> bool { + self.inner.short_circuits() + } + + fn evaluate_bounds(&self, input: &[&Interval]) -> Result { + self.inner.evaluate_bounds(input) + } + + fn propagate_constraints( + &self, + interval: &Interval, + inputs: &[&Interval], + ) -> Result>> { + self.inner.propagate_constraints(interval, inputs) + } + + fn output_ordering(&self, inputs: &[ExprProperties]) -> Result { + self.inner.output_ordering(inputs) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + self.inner.coerce_types(arg_types) } fn equals(&self, other: &dyn ScalarUDFImpl) -> bool { diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs index 88b3d613cb43..2df7ab04c678 100644 --- a/datafusion/expr/src/udwf.rs +++ b/datafusion/expr/src/udwf.rs @@ -428,6 +428,10 @@ impl WindowUDFImpl for AliasedWindowUDFImpl { &self.aliases } + fn simplify(&self) -> Option { + self.inner.simplify() + } + fn equals(&self, other: &dyn WindowUDFImpl) -> bool { if let Some(other) = other.as_any().downcast_ref::() { self.inner.equals(other.inner.as_ref()) && self.aliases == other.aliases @@ -442,6 +446,18 @@ impl WindowUDFImpl for AliasedWindowUDFImpl { self.aliases.hash(hasher); hasher.finish() } + + fn nullable(&self) -> bool { + self.inner.nullable() + } + + fn sort_options(&self) -> Option { + self.inner.sort_options() + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + self.inner.coerce_types(arg_types) + } } /// Implementation of [`WindowUDFImpl`] that wraps the function style pointers diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 2162442f054e..193b15a7f91e 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -150,7 +150,7 @@ impl AggregateUDFImpl for FirstValue { } fn with_beneficial_ordering( - self: Arc, + &self, beneficial_ordering: bool, ) -> Result>> { Ok(Some(Arc::new( @@ -451,7 +451,7 @@ impl AggregateUDFImpl for LastValue { } fn with_beneficial_ordering( - self: Arc, + &self, beneficial_ordering: bool, ) -> Result>> { Ok(Some(Arc::new( From 33b89ceb3eb69e9b206121b883011bd85ea56b7e Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Thu, 22 Aug 2024 11:52:58 +0200 Subject: [PATCH 2/4] revert back to having Arc --- datafusion/expr/src/udaf.rs | 5 +++-- datafusion/functions-aggregate/src/first_last.rs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 76699d399f6d..57dea429f33d 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -442,7 +442,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { /// not implement the method, returns an error. Order insensitive and hard /// requirement aggregators return `Ok(None)`. fn with_beneficial_ordering( - &self, + self: Arc, _beneficial_ordering: bool, ) -> Result>> { if self.order_sensitivity().is_beneficial() { @@ -631,10 +631,11 @@ impl AggregateUDFImpl for AliasedAggregateUDFImpl { } fn with_beneficial_ordering( - &self, + self: Arc, beneficial_ordering: bool, ) -> Result>> { self.inner + .clone() .with_beneficial_ordering(beneficial_ordering) .map(|udf| { udf.map(|udf| { diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 193b15a7f91e..2162442f054e 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -150,7 +150,7 @@ impl AggregateUDFImpl for FirstValue { } fn with_beneficial_ordering( - &self, + self: Arc, beneficial_ordering: bool, ) -> Result>> { Ok(Some(Arc::new( @@ -451,7 +451,7 @@ impl AggregateUDFImpl for LastValue { } fn with_beneficial_ordering( - &self, + self: Arc, beneficial_ordering: bool, ) -> Result>> { Ok(Some(Arc::new( From 175b8d2b680af7e1e2c0563784946f55d6761a24 Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Thu, 22 Aug 2024 11:55:22 +0200 Subject: [PATCH 3/4] add notes about adding stuff into Aliased impls --- datafusion/expr/src/udaf.rs | 3 +++ datafusion/expr/src/udf.rs | 3 +++ datafusion/expr/src/udwf.rs | 3 +++ 3 files changed, 9 insertions(+) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 57dea429f33d..14691d845d5c 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -328,6 +328,9 @@ where /// let expr = geometric_mean.call(vec![col("a")]); /// ``` pub trait AggregateUDFImpl: Debug + Send + Sync { + // Note: When adding any methods (with default implementations), remember to add them also + // into the AliasedAggregateUDFImpl below! + /// Returns this object as an [`Any`] trait object fn as_any(&self) -> &dyn Any; diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 40f2b1e25832..1b7511260e1d 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -342,6 +342,9 @@ where /// let expr = add_one.call(vec![col("a")]); /// ``` pub trait ScalarUDFImpl: Debug + Send + Sync { + // Note: When adding any methods (with default implementations), remember to add them also + // into the AliasedScalarUDFImpl below! + /// Returns this object as an [`Any`] trait object fn as_any(&self) -> &dyn Any; diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs index 2df7ab04c678..e5fdaaceb439 100644 --- a/datafusion/expr/src/udwf.rs +++ b/datafusion/expr/src/udwf.rs @@ -266,6 +266,9 @@ where /// .unwrap(); /// ``` pub trait WindowUDFImpl: Debug + Send + Sync { + // Note: When adding any methods (with default implementations), remember to add them also + // into the AliasedWindowUDFImpl below! + /// Returns this object as an [`Any`] trait object fn as_any(&self) -> &dyn Any; From 3fb25af5e8feae3dfcf349a4047512c525bece63 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 22 Aug 2024 06:16:13 -0400 Subject: [PATCH 4/4] fix clippy --- datafusion/expr/src/udaf.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 14691d845d5c..d373765cd2d7 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -637,8 +637,7 @@ impl AggregateUDFImpl for AliasedAggregateUDFImpl { self: Arc, beneficial_ordering: bool, ) -> Result>> { - self.inner - .clone() + Arc::clone(&self.inner) .with_beneficial_ordering(beneficial_ordering) .map(|udf| { udf.map(|udf| {