Skip to content

Fix Query Planner able to find struct field with capital letters #16664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 97 additions & 9 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ impl DFSchema {
qualifier: Option<&TableReference>,
name: &str,
) -> Option<usize> {
// First try exact match for performance
let mut matches = self
.iter()
.enumerate()
Expand All @@ -351,6 +352,31 @@ impl DFSchema {
(None, Some(_)) | (None, None) => f.name() == name,
})
.map(|(idx, _)| idx);

if let Some(idx) = matches.next() {
return Some(idx);
}

// If no exact match, try case-insensitive match
let mut matches = self
.iter()
.enumerate()
.filter(|(_, (q, f))| match (qualifier, q) {
// field to lookup is qualified.
// current field is qualified and not shared between relations, compare both
// qualifier and name.
(Some(q), Some(field_q)) => {
q.resolved_eq(field_q)
&& f.name().to_ascii_lowercase() == name.to_ascii_lowercase()
}
// field to lookup is qualified but current field is unqualified.
(Some(_), None) => false,
// field to lookup is unqualified, no need to compare qualifier
(None, Some(_)) | (None, None) => {
f.name().to_ascii_lowercase() == name.to_ascii_lowercase()
}
})
.map(|(idx, _)| idx);
matches.next()
}

Expand Down Expand Up @@ -429,10 +455,25 @@ impl DFSchema {

/// Find all fields that match the given name
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
self.fields()
// First try exact match
let exact_matches = self
.fields()
.iter()
.filter(|field| field.name() == name)
.map(|f| f.as_ref())
.collect::<Vec<_>>();

if !exact_matches.is_empty() {
return exact_matches;
}

// If no exact match, try case-insensitive match
self.fields()
.iter()
.filter(|field| {
field.name().to_ascii_lowercase() == name.to_ascii_lowercase()
})
.map(|f| f.as_ref())
.collect()
}

Expand All @@ -441,17 +482,45 @@ impl DFSchema {
&self,
name: &str,
) -> Vec<(Option<&TableReference>, &Field)> {
self.iter()
// First try exact match
let exact_matches = self
.iter()
.filter(|(_, field)| field.name() == name)
.map(|(qualifier, field)| (qualifier, field.as_ref()))
.collect::<Vec<_>>();

if !exact_matches.is_empty() {
return exact_matches;
}

// If no exact match, try case-insensitive match
self.iter()
.filter(|(_, field)| {
field.name().to_ascii_lowercase() == name.to_ascii_lowercase()
})
.map(|(qualifier, field)| (qualifier, field.as_ref()))
.collect()
}

/// Find all fields that match the given name and convert to column
pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
self.iter()
// First try exact match
let exact_matches = self
.iter()
.filter(|(_, field)| field.name() == name)
.map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
.collect::<Vec<_>>();

if !exact_matches.is_empty() {
return exact_matches;
}

// If no exact match, try case-insensitive match
self.iter()
.filter(|(_, field)| {
field.name().to_ascii_lowercase() == name.to_ascii_lowercase()
})
.map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
.collect()
}

Expand Down Expand Up @@ -525,7 +594,15 @@ impl DFSchema {

/// Find if the field exists with the given name
pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
self.fields().iter().any(|field| field.name() == name)
// First try exact match
if self.fields().iter().any(|field| field.name() == name) {
return true;
}

// If no exact match, try case-insensitive match
self.fields()
.iter()
.any(|field| field.name().to_ascii_lowercase() == name.to_ascii_lowercase())
}

/// Find if the field exists with the given qualified name
Expand All @@ -534,8 +611,19 @@ impl DFSchema {
qualifier: &TableReference,
name: &str,
) -> bool {
self.iter()
// First try exact match
if self
.iter()
.any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
{
return true;
}

// If no exact match, try case-insensitive match
self.iter().any(|(q, f)| {
q.map(|q| q.eq(qualifier)).unwrap_or(false)
&& f.name().to_ascii_lowercase() == name.to_ascii_lowercase()
})
}

/// Find if the field exists with the given qualified column
Expand Down Expand Up @@ -692,9 +780,9 @@ impl DFSchema {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
// all fields have to be the same
iter1
.zip(iter2)
.zip(iter2)
.all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
Expand Down Expand Up @@ -751,9 +839,9 @@ impl DFSchema {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
// all fields have to be the same
iter1
.zip(iter2)
.zip(iter2)
.all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
Expand Down
132 changes: 114 additions & 18 deletions datafusion/functions/src/core/getfield.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,23 +154,27 @@ impl ScalarUDFImpl for GetFieldFunc {
}
(DataType::Struct(fields),sv) => {
sv.and_then(|sv| sv.try_as_str().flatten().filter(|s| !s.is_empty()))
.map_or_else(
|| exec_err!("Field name must be a non-empty string"),
|field_name| {
fields.iter().find(|f| f.name() == field_name)
.ok_or(plan_datafusion_err!("Field {field_name} not found in struct"))
.map(|f| {
let mut child_field = f.as_ref().clone();

// If the parent is nullable, then getting the child must be nullable,
// so potentially override the return value

if args.arg_fields[0].is_nullable() {
child_field = child_field.with_nullable(true);
}
Arc::new(child_field)
})
})
.map_or_else(
|| exec_err!("Field name must be a non-empty string"),
|field_name| {
// First try exact match for performance
let field = fields.iter().find(|f| f.name() == field_name)
// If no exact match, try case-insensitive match
.or_else(|| fields.iter().find(|f| f.name().to_ascii_lowercase() == field_name.to_ascii_lowercase()));

field.ok_or(plan_datafusion_err!("Field {field_name} not found in struct"))
.map(|f| {
let mut child_field = f.as_ref().clone();

// If the parent is nullable, then getting the child must be nullable,
// so potentially override the return value

if args.arg_fields[0].is_nullable() {
child_field = child_field.with_nullable(true);
}
Arc::new(child_field)
})
})
},
(DataType::Null, _) => Ok(Field::new(self.name(), DataType::Null, true).into()),
(other, _) => exec_err!("The expression to get an indexed field is only valid for `Struct`, `Map` or `Null` types, got {other}"),
Expand Down Expand Up @@ -263,7 +267,19 @@ impl ScalarUDFImpl for GetFieldFunc {
}
(DataType::Struct(_), ScalarValue::Utf8(Some(k))) => {
let as_struct_array = as_struct_array(&array)?;
match as_struct_array.column_by_name(&k) {
// First try exact match for performance (delegates to Arrow's implementation)
let column = as_struct_array.column_by_name(&k)
// If no exact match, try case-insensitive match
.or_else(|| {
for (field_name, column) in as_struct_array.fields().iter().zip(as_struct_array.columns()) {
if field_name.name().to_ascii_lowercase() == k.to_ascii_lowercase() {
return Some(column);
}
}
None
});

match column {
None => exec_err!("get indexed field {k} not found in struct"),
Some(col) => Ok(ColumnarValue::Array(Arc::clone(col))),
}
Expand All @@ -284,3 +300,83 @@ impl ScalarUDFImpl for GetFieldFunc {
self.doc()
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Array, Int32Array, StringArray, StructArray};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
use std::sync::Arc;

#[test]
fn test_case_insensitive_struct_field_access() -> Result<()> {
let field1_array = Arc::new(Int32Array::from(vec![Some(1), Some(2), None]));
let field2_array =
Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")]));

let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("fooBar", DataType::Int32, true)),
field1_array as Arc<dyn Array>,
),
(
Arc::new(Field::new("another_field", DataType::Utf8, true)),
field2_array as Arc<dyn Array>,
),
]);

let get_field_func = GetFieldFunc::new();

let struct_field = Arc::new(Field::new(
"test_struct",
struct_array.data_type().clone(),
true,
));
let string_field = Arc::new(Field::new("field_name", DataType::Utf8, false));
let return_field = Arc::new(Field::new("result", DataType::Int32, true));

// Access field with exact case
let result = get_field_func.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(struct_array.clone())),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("fooBar".to_string()))),
],
arg_fields: vec![struct_field.clone(), string_field.clone()],
number_rows: 3,
return_field: return_field.clone(),
})?;

if let ColumnarValue::Array(result_array) = result {
let int_array = result_array.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(int_array.value(0), 1);
assert_eq!(int_array.value(1), 2);
assert!(int_array.is_null(2));
} else {
panic!("Expected array result");
}

// Access field with lowercase (as would come from SQL parser)
let result = get_field_func.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(struct_array)),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("foobar".to_string()))), // lowercase from SQL parser
],
arg_fields: vec![struct_field, string_field],
number_rows: 3,
return_field,
})?;

if let ColumnarValue::Array(result_array) = result {
let int_array = result_array.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(int_array.value(0), 1);
assert_eq!(int_array.value(1), 2);
assert!(int_array.is_null(2));
} else {
panic!("Expected array result");
}

Ok(())
}
}
8 changes: 3 additions & 5 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let normalize_ident = self.ident_normalizer.normalize(id);

// Check for qualified field with unqualified name
if let Ok((qualifier, _)) =
if let Ok((qualifier, field)) =
schema.qualified_field_with_unqualified_name(normalize_ident.as_str())
{
let mut column = Column::new(
qualifier.filter(|q| q.table() != UNNAMED_TABLE).cloned(),
normalize_ident,
);
let filtered_qualifier = qualifier.filter(|q| q.table() != UNNAMED_TABLE).cloned();
let mut column = Column::new(filtered_qualifier, field.name().clone());
if self.options.collect_spans {
if let Some(span) = Span::try_from_sqlparser_span(id_span) {
column.spans_mut().add_span(span);
Expand Down
Loading