Skip to content

Encapsulate metadata for literals on to a FieldMetadata structure #16317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, not_impl_err,
plan_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::expr::FieldMetadata;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
lit_with_metadata, Accumulator, ColumnarValue, CreateFunction, CreateFunctionBody,
Expand Down Expand Up @@ -1535,7 +1536,7 @@ async fn test_metadata_based_udf_with_literal() -> Result<()> {
let df = ctx.sql("select 0;").await?.select(vec![
lit(5u64).alias_with_metadata("lit_with_doubling", Some(input_metadata.clone())),
lit(5u64).alias("lit_no_doubling"),
lit_with_metadata(5u64, Some(input_metadata))
lit_with_metadata(5u64, Some(FieldMetadata::from(input_metadata)))
.alias("lit_with_double_no_alias_metadata"),
])?;

Expand Down
162 changes: 159 additions & 3 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::logical_plan::Subquery;
use crate::Volatility;
use crate::{udaf, ExprSchemable, Operator, Signature, WindowFrame, WindowUDF};

use arrow::datatypes::{DataType, FieldRef};
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::cse::{HashNode, NormalizeEq, Normalizeable};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeContainer, TreeNodeRecursion,
Expand Down Expand Up @@ -284,8 +284,8 @@ pub enum Expr {
Column(Column),
/// A named reference to a variable in a registry.
ScalarVariable(DataType, Vec<String>),
/// A constant value along with associated metadata
Literal(ScalarValue, Option<BTreeMap<String, String>>),
/// A constant value along with associated [`FieldMetadata`].
Literal(ScalarValue, Option<FieldMetadata>),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this PR is to put all metadata handling in a Struct to make it easier to work with (and more efficient)

/// A binary expression such as "age > 21"
BinaryExpr(BinaryExpr),
/// LIKE expression
Expand Down Expand Up @@ -413,6 +413,162 @@ impl<'a> TreeNodeContainer<'a, Self> for Expr {
}
}

/// Literal metadata
///
/// Stores metadata associated with a literal expressions
/// and is designed to be fast to `clone`.
///
/// This structure is used to store metadata associated with a literal expression, and it
/// corresponds to the `metadata` field on [`Field`].
///
/// # Example: Create [`FieldMetadata`] from a [`Field`]
/// ```
/// # use std::collections::HashMap;
/// # use datafusion_expr::expr::FieldMetadata;
/// # use arrow::datatypes::{Field, DataType};
/// # let field = Field::new("c1", DataType::Int32, true)
/// # .with_metadata(HashMap::from([("foo".to_string(), "bar".to_string())]));
/// // Create a new `FieldMetadata` instance from a `Field`
/// let metadata = FieldMetadata::new_from_field(&field);
/// // There is also a `From` impl:
/// let metadata = FieldMetadata::from(&field);
/// ```
///
/// # Example: Update a [`Field`] with [`FieldMetadata`]
/// ```
/// # use datafusion_expr::expr::FieldMetadata;
/// # use arrow::datatypes::{Field, DataType};
/// # let field = Field::new("c1", DataType::Int32, true);
/// # let metadata = FieldMetadata::new_from_field(&field);
/// // Add any metadata from `FieldMetadata` to `Field`
/// let updated_field = metadata.add_to_field(field);
/// ```
///
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct FieldMetadata {
/// The inner metadata of a literal expression, which is a map of string
/// keys to string values.
///
/// Note this is not a `HashMap because `HashMap` does not provide
/// implementations for traits like `Debug` and `Hash`.
inner: Arc<BTreeMap<String, String>>,
}

impl FieldMetadata {
/// Create a new empty metadata instance.
pub fn new_empty() -> Self {
Self {
inner: Arc::new(BTreeMap::new()),
}
}

/// Merges two optional `FieldMetadata` instances, overwriting any existing
/// keys in `m` with keys from `n` if present
pub fn merge_options(
m: Option<&FieldMetadata>,
n: Option<&FieldMetadata>,
) -> Option<FieldMetadata> {
match (m, n) {
(Some(m), Some(n)) => {
let mut merged = m.clone();
merged.extend(n.clone());
Some(merged)
}
(Some(m), None) => Some(m.clone()),
(None, Some(n)) => Some(n.clone()),
(None, None) => None,
}
}

/// Create a new metadata instance from a `Field`'s metadata.
pub fn new_from_field(field: &Field) -> Self {
let inner = field
.metadata()
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
Self {
inner: Arc::new(inner),
}
}

/// Create a new metadata instance from a map of string keys to string values.
pub fn new(inner: BTreeMap<String, String>) -> Self {
Self {
inner: Arc::new(inner),
}
}

/// Get the inner metadata as a reference to a `BTreeMap`.
pub fn inner(&self) -> &BTreeMap<String, String> {
&self.inner
}

/// Return the inner metadata
pub fn into_inner(self) -> Arc<BTreeMap<String, String>> {
self.inner
}

/// Adds metadata from `other` into `self`, overwriting any existing keys.
pub fn extend(&mut self, other: Self) {
let other = Arc::unwrap_or_clone(other.into_inner());
Arc::make_mut(&mut self.inner).extend(other);
}

/// Returns true if the metadata is empty.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

/// Returns the number of key-value pairs in the metadata.
pub fn len(&self) -> usize {
self.inner.len()
}

/// Updates the metadata on the Field with this metadata, if it is not empty.
pub fn add_to_field(&self, field: Field) -> Field {
if self.inner.is_empty() {
return field;
}

field.with_metadata(
self.inner
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
)
}
}

impl From<&Field> for FieldMetadata {
fn from(field: &Field) -> Self {
Self::new_from_field(field)
}
}

impl From<BTreeMap<String, String>> for FieldMetadata {
fn from(inner: BTreeMap<String, String>) -> Self {
Self::new(inner)
}
}

impl From<std::collections::HashMap<String, String>> for FieldMetadata {
fn from(map: std::collections::HashMap<String, String>) -> Self {
Self::new(map.into_iter().collect())
}
}

/// From reference
impl From<&std::collections::HashMap<String, String>> for FieldMetadata {
fn from(map: &std::collections::HashMap<String, String>) -> Self {
let inner = map
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
Self::new(inner)
}
}

/// UNNEST expression.
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Unnest {
Expand Down
6 changes: 1 addition & 5 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,7 @@ mod test {
} else {
utf8_val
};
Ok(Transformed::yes(lit_with_metadata(
utf8_val,
metadata
.map(|m| m.into_iter().collect::<HashMap<String, String>>()),
)))
Ok(Transformed::yes(lit_with_metadata(utf8_val, metadata)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty good example of the kind of simplification this PR allows (don't have to translate back/forth between HashMap / BTreeMap in various places)

}
// otherwise, return None
_ => Ok(Transformed::no(expr)),
Expand Down
7 changes: 1 addition & 6 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,7 @@ impl ExprSchemable for Expr {
Expr::Literal(l, metadata) => {
let mut field = Field::new(&schema_name, l.data_type(), l.is_null());
if let Some(metadata) = metadata {
field = field.with_metadata(
metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
);
field = metadata.add_to_field(field);
}
Ok(Arc::new(field))
}
Expand Down
11 changes: 3 additions & 8 deletions datafusion/expr/src/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,29 @@

//! Literal module contains foundational types that are used to represent literals in DataFusion.

use crate::expr::FieldMetadata;
use crate::Expr;
use datafusion_common::ScalarValue;
use std::collections::HashMap;

/// Create a literal expression
pub fn lit<T: Literal>(n: T) -> Expr {
n.lit()
}

pub fn lit_with_metadata<T: Literal>(
n: T,
metadata: impl Into<Option<HashMap<String, String>>>,
) -> Expr {
let metadata = metadata.into();
pub fn lit_with_metadata<T: Literal>(n: T, metadata: Option<FieldMetadata>) -> Expr {
let Some(metadata) = metadata else {
return n.lit();
};

let Expr::Literal(sv, prior_metadata) = n.lit() else {
unreachable!();
};

let new_metadata = match prior_metadata {
Some(mut prior) => {
prior.extend(metadata);
prior
}
None => metadata.into_iter().collect(),
None => metadata,
};

Expr::Literal(sv, Some(new_metadata))
Expand Down
11 changes: 5 additions & 6 deletions datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Expression simplification API

use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::collections::HashSet;
use std::ops::Not;

use arrow::{
Expand Down Expand Up @@ -58,6 +58,7 @@ use crate::{
analyzer::type_coercion::TypeCoercionRewriter,
simplify_expressions::unwrap_cast::try_cast_literal_to_type,
};
use datafusion_expr::expr::FieldMetadata;
use indexmap::IndexSet;
use regex::Regex;

Expand Down Expand Up @@ -523,9 +524,9 @@ struct ConstEvaluator<'a> {
#[allow(clippy::large_enum_variant)]
enum ConstSimplifyResult {
// Expr was simplified and contains the new expression
Simplified(ScalarValue, Option<BTreeMap<String, String>>),
Simplified(ScalarValue, Option<FieldMetadata>),
// Expr was not simplified and original value is returned
NotSimplified(ScalarValue, Option<BTreeMap<String, String>>),
NotSimplified(ScalarValue, Option<FieldMetadata>),
// Evaluation encountered an error, contains the original expression
SimplifyRuntimeError(DataFusionError, Expr),
}
Expand Down Expand Up @@ -682,9 +683,7 @@ impl<'a> ConstEvaluator<'a> {
let m = f.metadata();
match m.is_empty() {
true => None,
false => {
Some(m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
}
false => Some(FieldMetadata::from(m)),
}
});
let col_val = match phys_expr.evaluate(&self.input_batch) {
Expand Down
7 changes: 3 additions & 4 deletions datafusion/physical-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Literal expressions for physical operations

use std::any::Any;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;

Expand All @@ -30,6 +29,7 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::expr::FieldMetadata;
use datafusion_expr::Expr;
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_expr_common::interval_arithmetic::Interval;
Expand Down Expand Up @@ -64,14 +64,13 @@ impl Literal {
/// Create a literal value expression
pub fn new_with_metadata(
value: ScalarValue,
metadata: impl Into<Option<HashMap<String, String>>>,
metadata: Option<FieldMetadata>,
) -> Self {
let metadata = metadata.into();
let mut field =
Field::new(format!("{value}"), value.data_type(), value.is_null());

if let Some(metadata) = metadata {
field = field.with_metadata(metadata);
field = metadata.add_to_field(field);
}

Self {
Expand Down
30 changes: 9 additions & 21 deletions datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use crate::ScalarFunctionExpr;
Expand All @@ -29,7 +28,9 @@ use datafusion_common::{
exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
use datafusion_expr::expr::{
Alias, Cast, FieldMetadata, InList, Placeholder, ScalarFunction,
};
use datafusion_expr::var_provider::is_system_variables;
use datafusion_expr::var_provider::VarType;
use datafusion_expr::{
Expand Down Expand Up @@ -114,22 +115,11 @@ pub fn create_physical_expr(
match e {
Expr::Alias(Alias { expr, metadata, .. }) => {
if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
let mut new_metadata = prior_metadata
.as_ref()
.map(|m| {
m.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<String, String>>()
})
.unwrap_or_default();
if let Some(metadata) = metadata {
new_metadata.extend(metadata.clone());
}
let new_metadata = match new_metadata.is_empty() {
true => None,
false => Some(new_metadata),
};

let metadata = metadata.as_ref().map(|m| FieldMetadata::from(m.clone()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let new_metadata = FieldMetadata::merge_options(
prior_metadata.as_ref(),
metadata.as_ref(),
);
Ok(Arc::new(Literal::new_with_metadata(
v.clone(),
new_metadata,
Expand All @@ -144,9 +134,7 @@ pub fn create_physical_expr(
}
Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
value.clone(),
metadata
.as_ref()
.map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect()),
metadata.clone(),
))),
Expr::ScalarVariable(_, variable_names) => {
if is_system_variables(variable_names) {
Expand Down