From a916b4c34931d5f8f8781ec6243ac3360e4dca57 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 6 Jun 2025 17:38:07 -0400 Subject: [PATCH 1/4] Encapsulate FieldMetadata --- .../user_defined_scalar_functions.rs | 3 +- datafusion/expr/src/expr.rs | 116 +++++++++++++++++- datafusion/expr/src/expr_rewriter/mod.rs | 6 +- datafusion/expr/src/expr_schema.rs | 7 +- datafusion/expr/src/literal.rs | 11 +- .../simplify_expressions/expr_simplifier.rs | 11 +- .../physical-expr/src/expressions/literal.rs | 7 +- datafusion/physical-expr/src/planner.rs | 41 ++++--- 8 files changed, 150 insertions(+), 52 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index dcaa1ef95235..d7dd65deab5f 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -40,6 +40,7 @@ use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, not_impl_err, plan_err, DFSchema, DataFusionError, Result, ScalarValue, }; +use datafusion_expr::expr::FieldMetadata; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ lit_with_metadata, Accumulator, ColumnarValue, CreateFunction, CreateFunctionBody, @@ -1535,7 +1536,7 @@ async fn test_metadata_based_udf_with_literal() -> Result<()> { let df = ctx.sql("select 0;").await?.select(vec![ lit(5u64).alias_with_metadata("lit_with_doubling", Some(input_metadata.clone())), lit(5u64).alias("lit_no_doubling"), - lit_with_metadata(5u64, Some(input_metadata)) + lit_with_metadata(5u64, Some(FieldMetadata::from(input_metadata))) .alias("lit_with_double_no_alias_metadata"), ])?; diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index f379edf10584..e4ac1ad83b6f 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -30,7 +30,7 @@ use crate::logical_plan::Subquery; use crate::Volatility; use crate::{udaf, ExprSchemable, Operator, Signature, WindowFrame, WindowUDF}; -use arrow::datatypes::{DataType, FieldRef}; +use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::cse::{HashNode, NormalizeEq, Normalizeable}; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeContainer, TreeNodeRecursion, @@ -284,8 +284,8 @@ pub enum Expr { Column(Column), /// A named reference to a variable in a registry. ScalarVariable(DataType, Vec), - /// A constant value along with associated metadata - Literal(ScalarValue, Option>), + /// A constant value along with associated [`FieldMetadata`]. + Literal(ScalarValue, Option), /// A binary expression such as "age > 21" BinaryExpr(BinaryExpr), /// LIKE expression @@ -413,6 +413,116 @@ impl<'a> TreeNodeContainer<'a, Self> for Expr { } } +/// Literal metadata +/// +/// This structure is used to store metadata associated with a literal expressions +/// and is designed to be cheap to `clone`. +/// +/// This structure is used to store metadata associated with a literal expression, and it +/// corresponds to the `metadata` field on [`FieldRef`]. +#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] +pub struct FieldMetadata { + /// The inner metadata of a literal expression, which is a map of string + /// keys to string values. + /// + /// Note this is not a `HashMap because `HashMap` does not provide + /// implementations for traits like `Debug` and `Hash`. + inner: Arc>, +} + +impl FieldMetadata { + /// Create a new empty metadata instance. + pub fn new_empty() -> Self { + Self { + inner: Arc::new(BTreeMap::new()), + } + } + + /// Create a new metadata instance from a `Field`'s metadata. + pub fn new_from_field(field: &Field) -> Self { + let inner = field + .metadata() + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + Self { + inner: Arc::new(inner), + } + } + + /// Create a new metadata instance from a map of string keys to string values. + pub fn new(inner: BTreeMap) -> Self { + Self { + inner: Arc::new(inner), + } + } + + /// Get the inner metadata as a reference to a `BTreeMap`. + pub fn inner(&self) -> &BTreeMap { + &self.inner + } + + /// Return the inner metadata + pub fn into_inner(self) -> Arc> { + self.inner + } + + /// Adds metadata from `other` into `self`, overwriting any existing keys. + pub fn extend(&mut self, other: Self) { + let other = Arc::unwrap_or_clone(other.into_inner()); + Arc::make_mut(&mut self.inner).extend(other); + } + + /// Returns true if the metadata is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Returns the number of key-value pairs in the metadata. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Updates the metadata on the Field with this metadata + pub fn add_to_field(&self, field: Field) -> Field { + field.with_metadata( + self.inner + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(), + ) + } +} + +impl From<&Field> for FieldMetadata { + fn from(field: &Field) -> Self { + Self::new_from_field(field) + } +} + +impl From> for FieldMetadata { + fn from(inner: BTreeMap) -> Self { + Self::new(inner) + } +} + +impl From> for FieldMetadata { + fn from(map: std::collections::HashMap) -> Self { + Self::new(map.into_iter().collect()) + } +} + +/// From reference +impl From<&std::collections::HashMap> for FieldMetadata { + fn from(map: &std::collections::HashMap) -> Self { + let inner = map + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + Self::new(inner) + } +} + /// UNNEST expression. #[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] pub struct Unnest { diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index f80b8e5a7759..05a9425452a1 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -390,11 +390,7 @@ mod test { } else { utf8_val }; - Ok(Transformed::yes(lit_with_metadata( - utf8_val, - metadata - .map(|m| m.into_iter().collect::>()), - ))) + Ok(Transformed::yes(lit_with_metadata(utf8_val, metadata))) } // otherwise, return None _ => Ok(Transformed::no(expr)), diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 1973a00a67df..5ff487303308 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -423,12 +423,7 @@ impl ExprSchemable for Expr { Expr::Literal(l, metadata) => { let mut field = Field::new(&schema_name, l.data_type(), l.is_null()); if let Some(metadata) = metadata { - field = field.with_metadata( - metadata - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(), - ); + field = metadata.add_to_field(field); } Ok(Arc::new(field)) } diff --git a/datafusion/expr/src/literal.rs b/datafusion/expr/src/literal.rs index 48e058b8b7b1..c4bd43bc0a62 100644 --- a/datafusion/expr/src/literal.rs +++ b/datafusion/expr/src/literal.rs @@ -17,20 +17,16 @@ //! Literal module contains foundational types that are used to represent literals in DataFusion. +use crate::expr::FieldMetadata; use crate::Expr; use datafusion_common::ScalarValue; -use std::collections::HashMap; /// Create a literal expression pub fn lit(n: T) -> Expr { n.lit() } -pub fn lit_with_metadata( - n: T, - metadata: impl Into>>, -) -> Expr { - let metadata = metadata.into(); +pub fn lit_with_metadata(n: T, metadata: Option) -> Expr { let Some(metadata) = metadata else { return n.lit(); }; @@ -38,13 +34,12 @@ pub fn lit_with_metadata( let Expr::Literal(sv, prior_metadata) = n.lit() else { unreachable!(); }; - let new_metadata = match prior_metadata { Some(mut prior) => { prior.extend(metadata); prior } - None => metadata.into_iter().collect(), + None => metadata, }; Expr::Literal(sv, Some(new_metadata)) diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index e91aea3305be..2be7a2b0bd6e 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -18,7 +18,7 @@ //! Expression simplification API use std::borrow::Cow; -use std::collections::{BTreeMap, HashSet}; +use std::collections::HashSet; use std::ops::Not; use arrow::{ @@ -58,6 +58,7 @@ use crate::{ analyzer::type_coercion::TypeCoercionRewriter, simplify_expressions::unwrap_cast::try_cast_literal_to_type, }; +use datafusion_expr::expr::FieldMetadata; use indexmap::IndexSet; use regex::Regex; @@ -523,9 +524,9 @@ struct ConstEvaluator<'a> { #[allow(clippy::large_enum_variant)] enum ConstSimplifyResult { // Expr was simplified and contains the new expression - Simplified(ScalarValue, Option>), + Simplified(ScalarValue, Option), // Expr was not simplified and original value is returned - NotSimplified(ScalarValue, Option>), + NotSimplified(ScalarValue, Option), // Evaluation encountered an error, contains the original expression SimplifyRuntimeError(DataFusionError, Expr), } @@ -682,9 +683,7 @@ impl<'a> ConstEvaluator<'a> { let m = f.metadata(); match m.is_empty() { true => None, - false => { - Some(m.iter().map(|(k, v)| (k.clone(), v.clone())).collect()) - } + false => Some(FieldMetadata::from(m)), } }); let col_val = match phys_expr.evaluate(&self.input_batch) { diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 0d4d62ef4719..7739b7cf1753 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -18,7 +18,6 @@ //! Literal expressions for physical operations use std::any::Any; -use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; @@ -30,6 +29,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::expr::FieldMetadata; use datafusion_expr::Expr; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; @@ -64,14 +64,13 @@ impl Literal { /// Create a literal value expression pub fn new_with_metadata( value: ScalarValue, - metadata: impl Into>>, + metadata: Option, ) -> Self { - let metadata = metadata.into(); let mut field = Field::new(format!("{value}"), value.data_type(), value.is_null()); if let Some(metadata) = metadata { - field = field.with_metadata(metadata); + field = metadata.add_to_field(field); } Self { diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index c08d6d2741a0..4b12eda936dc 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::sync::Arc; use crate::ScalarFunctionExpr; @@ -29,7 +28,9 @@ use datafusion_common::{ exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema, }; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction}; +use datafusion_expr::expr::{ + Alias, Cast, FieldMetadata, InList, Placeholder, ScalarFunction, +}; use datafusion_expr::var_provider::is_system_variables; use datafusion_expr::var_provider::VarType; use datafusion_expr::{ @@ -114,22 +115,26 @@ pub fn create_physical_expr( match e { Expr::Alias(Alias { expr, metadata, .. }) => { if let Expr::Literal(v, prior_metadata) = expr.as_ref() { - let mut new_metadata = prior_metadata - .as_ref() - .map(|m| { - m.iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect::>() - }) - .unwrap_or_default(); - if let Some(metadata) = metadata { - new_metadata.extend(metadata.clone()); - } - let new_metadata = match new_metadata.is_empty() { - true => None, - false => Some(new_metadata), + let metadata = metadata.as_ref().map(|m| FieldMetadata::from(m.clone())); + let new_metadata = match (prior_metadata.as_ref(), metadata) { + (Some(m), Some(n)) => { + let mut m = m.clone(); + m.extend(n); + Some(m) + } + (Some(m), None) => Some(m.clone()), + (None, Some(n)) => Some(n), + (None, None) => None, }; + let new_metadata = new_metadata.and_then(|new_metadata| { + if new_metadata.is_empty() { + None + } else { + Some(new_metadata) + } + }); + Ok(Arc::new(Literal::new_with_metadata( v.clone(), new_metadata, @@ -144,9 +149,7 @@ pub fn create_physical_expr( } Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata( value.clone(), - metadata - .as_ref() - .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect()), + metadata.clone(), ))), Expr::ScalarVariable(_, variable_names) => { if is_system_variables(variable_names) { From 0ab21472c38940595c4f812af2ea9240ce4d5a83 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 7 Jun 2025 07:09:50 -0400 Subject: [PATCH 2/4] Add examples --- datafusion/expr/src/expr.rs | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index e4ac1ad83b6f..5f27db4ae301 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -415,11 +415,35 @@ impl<'a> TreeNodeContainer<'a, Self> for Expr { /// Literal metadata /// -/// This structure is used to store metadata associated with a literal expressions -/// and is designed to be cheap to `clone`. +/// Stores metadata associated with a literal expressions +/// and is designed to be fast to `clone`. /// /// This structure is used to store metadata associated with a literal expression, and it -/// corresponds to the `metadata` field on [`FieldRef`]. +/// corresponds to the `metadata` field on [`Field`]. +/// +/// # Example: Create [`FieldMetadata`] from a [`Field`] +/// ``` +/// # use std::collections::HashMap; +/// # use datafusion_expr::expr::FieldMetadata; +/// # use arrow::datatypes::{Field, DataType}; +/// # let field = Field::new("c1", DataType::Int32, true) +/// # .with_metadata(HashMap::from([("foo".to_string(), "bar".to_string())])); +/// // Create a new `FieldMetadata` instance from a `Field` +/// let metadata = FieldMetadata::new_from_field(&field); +/// // There is also a `From` impl: +/// let metadata = FieldMetadata::from(&field); +/// ``` +/// +/// # Example: Update a [`Field`] with [`FieldMetadata`] +/// ``` +/// # use datafusion_expr::expr::FieldMetadata; +/// # use arrow::datatypes::{Field, DataType}; +/// # let field = Field::new("c1", DataType::Int32, true); +/// # let metadata = FieldMetadata::new_from_field(&field); +/// // Add any metadata from `FieldMetadata` to `Field` +/// let updated_field = metadata.add_to_field(field); +/// ``` +/// #[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] pub struct FieldMetadata { /// The inner metadata of a literal expression, which is a map of string @@ -485,6 +509,10 @@ impl FieldMetadata { /// Updates the metadata on the Field with this metadata pub fn add_to_field(&self, field: Field) -> Field { + if self.inner.is_empty() { + return field; + } + field.with_metadata( self.inner .iter() From d3bdd6c1f3cb15c9101ce791767acade6635fd29 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 7 Jun 2025 07:16:05 -0400 Subject: [PATCH 3/4] refactor --- datafusion/expr/src/expr.rs | 20 +++++++++++++++++++- datafusion/physical-expr/src/planner.rs | 23 ++++------------------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 5f27db4ae301..5e19b733667f 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -462,6 +462,24 @@ impl FieldMetadata { } } + /// Merges two optional `FieldMetadata` instances, overwriting any existing + /// keys in `m` with keys from `n` if present + pub fn merge_options( + m: Option<&FieldMetadata>, + n: Option<&FieldMetadata>, + ) -> Option { + match (m, n) { + (Some(m), Some(n)) => { + let mut merged = m.clone(); + merged.extend(n.clone()); + Some(merged) + } + (Some(m), None) => Some(m.clone()), + (None, Some(n)) => Some(n.clone()), + (None, None) => None, + } + } + /// Create a new metadata instance from a `Field`'s metadata. pub fn new_from_field(field: &Field) -> Self { let inner = field @@ -507,7 +525,7 @@ impl FieldMetadata { self.inner.len() } - /// Updates the metadata on the Field with this metadata + /// Updates the metadata on the Field with this metadata, if it is not empty. pub fn add_to_field(&self, field: Field) -> Field { if self.inner.is_empty() { return field; diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 4b12eda936dc..885901328988 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -116,25 +116,10 @@ pub fn create_physical_expr( Expr::Alias(Alias { expr, metadata, .. }) => { if let Expr::Literal(v, prior_metadata) = expr.as_ref() { let metadata = metadata.as_ref().map(|m| FieldMetadata::from(m.clone())); - let new_metadata = match (prior_metadata.as_ref(), metadata) { - (Some(m), Some(n)) => { - let mut m = m.clone(); - m.extend(n); - Some(m) - } - (Some(m), None) => Some(m.clone()), - (None, Some(n)) => Some(n), - (None, None) => None, - }; - - let new_metadata = new_metadata.and_then(|new_metadata| { - if new_metadata.is_empty() { - None - } else { - Some(new_metadata) - } - }); - + let new_metadata = FieldMetadata::merge_options( + prior_metadata.as_ref(), + metadata.as_ref(), + ); Ok(Arc::new(Literal::new_with_metadata( v.clone(), new_metadata, From c2759e9b06f352d40e6f13ded3f1e9094095ee62 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 8 Jun 2025 06:48:41 -0400 Subject: [PATCH 4/4] impl `Default` for `FieldMetadata` --- datafusion/expr/src/expr.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 5e19b733667f..f41f09fc0afa 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -454,6 +454,12 @@ pub struct FieldMetadata { inner: Arc>, } +impl Default for FieldMetadata { + fn default() -> Self { + Self::new_empty() + } +} + impl FieldMetadata { /// Create a new empty metadata instance. pub fn new_empty() -> Self {