Skip to content

fix: update with udf report error #18397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/query/expression/src/converts/arrow/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow_schema::Schema;
use arrow_schema::TimeUnit;
use databend_common_column::bitmap::Bitmap;
use databend_common_column::buffer::buffer_to_array_data;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

use super::ARROW_EXT_TYPE_BITMAP;
Expand Down Expand Up @@ -230,6 +231,14 @@ impl DataBlock {
}

pub fn to_record_batch(self, table_schema: &TableSchema) -> Result<RecordBatch> {
if self.columns().len() != table_schema.num_fields() {
return Err(ErrorCode::Internal(format!(
"The number of columns in the data block does not match the number of fields in the table schema, block_columns: {}, table_schema_fields: {}",
self.columns().len(),
table_schema.num_fields()
)));
}

if table_schema.num_fields() == 0 {
return Ok(RecordBatch::try_new_with_options(
Arc::new(Schema::empty()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl PipelineBuilder {
column_mutation.field_id_to_schema_index.clone(),
column_mutation.input_num_columns,
column_mutation.has_filter_column,
column_mutation.udf_col_num,
)?;
}

Expand Down Expand Up @@ -79,6 +80,7 @@ impl PipelineBuilder {
mut field_id_to_schema_index: HashMap<usize, usize>,
num_input_columns: usize,
has_filter_column: bool,
udf_col_num: usize,
) -> Result<()> {
let mut block_operators = Vec::new();
let mut next_column_offset = num_input_columns;
Expand Down Expand Up @@ -129,7 +131,7 @@ impl PipelineBuilder {
}

// Keep the original order of the columns.
let num_output_columns = num_input_columns - has_filter_column as usize;
let num_output_columns = num_input_columns - has_filter_column as usize - udf_col_num;
let mut projection = Vec::with_capacity(num_output_columns);
for idx in 0..num_output_columns {
if let Some(index) = schema_offset_to_new_offset.get(&idx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ pub struct ColumnMutation {
pub input_num_columns: usize,
pub has_filter_column: bool,
pub table_meta_timestamps: TableMetaTimestamps,
pub udf_col_num: usize,
}
33 changes: 32 additions & 1 deletion src/query/sql/src/executor/physical_plans/physical_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -100,7 +101,7 @@ impl PhysicalPlanBuilder {
&mut self,
s_expr: &SExpr,
mutation: &crate::plans::Mutation,
required: ColumnSet,
mut required: ColumnSet,
) -> Result<PhysicalPlan> {
let crate::plans::Mutation {
bind_context,
Expand All @@ -122,9 +123,38 @@ impl PhysicalPlanBuilder {
can_try_update_column_only,
no_effect,
truncate_table,
direct_filter,
..
} = mutation;

let mut maybe_udfs = BTreeSet::new();
for matched_evaluator in matched_evaluators {
if let Some(condition) = &matched_evaluator.condition {
maybe_udfs.extend(condition.used_columns());
}
if let Some(update_list) = &matched_evaluator.update {
for update_scalar in update_list.values() {
maybe_udfs.extend(update_scalar.used_columns());
}
}
}
for unmatched_evaluator in unmatched_evaluators {
if let Some(condition) = &unmatched_evaluator.condition {
maybe_udfs.extend(condition.used_columns());
}
for value in &unmatched_evaluator.values {
maybe_udfs.extend(value.used_columns());
}
}
for filter_value in direct_filter {
maybe_udfs.extend(filter_value.used_columns());
}

let udf_ids = s_expr.get_udfs_col_ids()?;
let required_udf_ids: BTreeSet<_> = maybe_udfs.intersection(&udf_ids).collect();
let udf_col_num = required_udf_ids.len();
required.extend(required_udf_ids);

let mut plan = self.build(s_expr.child(0)?, required).await?;
if *no_effect {
return Ok(plan);
Expand Down Expand Up @@ -220,6 +250,7 @@ impl PhysicalPlanBuilder {
input_num_columns: mutation_input_schema.fields().len(),
has_filter_column: predicate_column_index.is_some(),
table_meta_timestamps: mutation_build_info.table_meta_timestamps,
udf_col_num,
});

if *distributed {
Expand Down
16 changes: 16 additions & 0 deletions src/query/sql/src/planner/optimizer/ir/expr/s_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex;
Expand Down Expand Up @@ -224,6 +225,21 @@ impl SExpr {
Ok(udfs)
}

#[recursive::recursive]
pub fn get_udfs_col_ids(&self) -> Result<BTreeSet<IndexType>> {
let mut udf_ids = BTreeSet::new();
if let RelOperator::Udf(udf) = self.plan.as_ref() {
for item in udf.items.iter() {
udf_ids.insert(item.index);
}
}
for child in &self.children {
let udfs = child.get_udfs_col_ids()?;
udf_ids.extend(udfs);
}
Ok(udf_ids)
}

// Add column index to Scan nodes that match the given table index
pub fn add_column_index_to_scans(
&self,
Expand Down
45 changes: 45 additions & 0 deletions tests/sqllogictests/suites/udf_server/udf_server_test.test
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,51 @@ select * from _tmp_table order by field1;
4
5

statement ok
CREATE OR REPLACE TABLE test_update_udf(url STRING, length INT64);

statement ok
INSERT INTO test_update_udf (url) VALUES('databend.com'),('databend.cn');

statement ok
UPDATE test_update_udf SET length = url_len(url);

query TI
SELECT * FROM test_update_udf;
----
databend.com 12
databend.cn 11


statement ok
CREATE OR REPLACE TABLE test_update_udf_1(url STRING, a INT64,b INT64,c INT64);

statement ok
CREATE OR REPLACE FUNCTION url_len_mul_100 (VARCHAR) RETURNS BIGINT LANGUAGE python IMMUTABLE HANDLER = 'url_len_mul_100' ADDRESS = 'http://0.0.0.0:8815';

statement ok
INSERT INTO test_update_udf_1 (url) VALUES('databend.com'),('databend.cn');

statement ok
UPDATE test_update_udf_1 SET a = url_len(url),b = url_len_mul_100(url), c = length(url) + 123;

query TIII
SELECT * FROM test_update_udf_1;
----
databend.com 12 1200 135
databend.cn 11 1100 134

statement ok
UPDATE test_update_udf_1 SET b = url_len(url),c = url_len_mul_100(url), a = length(url) + 123;

query TIII
SELECT * FROM test_update_udf_1;
----
databend.com 135 12 1200
databend.cn 134 11 1100



query I
SELECT url_len('databend.com');
----
Expand Down
4 changes: 4 additions & 0 deletions tests/udf/udf_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ def json_access(data: Any, key: str) -> Any:
def url_len(key: str) -> int:
return len(key)

@udf(input_types=["VARCHAR"], result_type="BIGINT")
def url_len_mul_100(key: str) -> int:
return len(key) * 100

@udf(input_types=["ARRAY(VARIANT)"], result_type="VARIANT")
def json_concat(list: List[Any]) -> Any:
Expand Down Expand Up @@ -449,6 +452,7 @@ def embedding_4(s: str):
udf_server.add_function(wait)
udf_server.add_function(wait_concurrent)
udf_server.add_function(url_len)
udf_server.add_function(url_len_mul_100)
udf_server.add_function(check_headers)
udf_server.add_function(embedding_4)

Expand Down
Loading