From 9150a3a3b3c3f6df23620d3909eff65864b3f98a Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Sun, 26 Oct 2025 16:35:36 -0700 Subject: [PATCH 1/3] DuckDB: add support for `on_refresh_recompute_statistics` param --- core/src/duckdb.rs | 7 +- core/src/duckdb/write.rs | 44 +++++++++- core/src/duckdb/write_settings.rs | 140 ++++++++++++++++++++++++++++++ 3 files changed, 186 insertions(+), 5 deletions(-) create mode 100644 core/src/duckdb/write_settings.rs diff --git a/core/src/duckdb.rs b/core/src/duckdb.rs index eb7720ea..db73c005 100644 --- a/core/src/duckdb.rs +++ b/core/src/duckdb.rs @@ -1,3 +1,4 @@ +use crate::duckdb::write_settings::DuckDBWriteSettings; use crate::sql::sql_provider_datafusion; use crate::util::{ self, @@ -51,6 +52,7 @@ mod creator; mod settings; mod sql_table; pub mod write; +pub mod write_settings; pub use creator::{RelationName, TableDefinition, TableManager, ViewCreator}; #[derive(Debug, Snafu)] @@ -447,10 +449,13 @@ impl TableProviderFactory for DuckDBTableProviderFactory { let pool = Arc::new(pool); make_initial_table(Arc::clone(&table_definition), &pool)?; + let write_settings = DuckDBWriteSettings::from_params(&options); + let table_writer_builder = DuckDBTableWriterBuilder::new() .with_table_definition(Arc::clone(&table_definition)) .with_pool(pool) - .set_on_conflict(on_conflict); + .set_on_conflict(on_conflict) + .with_write_settings(write_settings); let dyn_pool: Arc = Arc::new(read_pool); diff --git a/core/src/duckdb/write.rs b/core/src/duckdb/write.rs index 6c7b7ea8..8224f798 100644 --- a/core/src/duckdb/write.rs +++ b/core/src/duckdb/write.rs @@ -33,6 +33,7 @@ use tokio::task::JoinHandle; use super::creator::{TableDefinition, TableManager, ViewCreator}; use super::{to_datafusion_error, RelationName}; +use super::write_settings::DuckDBWriteSettings; /// A callback handler that is invoked after data has been successfully written to a DuckDB table /// but before the transaction is committed. @@ -61,6 +62,7 @@ pub struct DuckDBTableWriterBuilder { on_conflict: Option, table_definition: Option>, on_data_written: Option, + write_settings: Option, } impl DuckDBTableWriterBuilder { @@ -99,6 +101,12 @@ impl DuckDBTableWriterBuilder { self } + #[must_use] + pub fn with_write_settings(mut self, write_settings: DuckDBWriteSettings) -> Self { + self.write_settings = Some(write_settings); + self + } + /// Builds a `DuckDBTableWriter` from the provided configuration. /// /// # Errors @@ -126,6 +134,7 @@ impl DuckDBTableWriterBuilder { table_definition, pool, on_data_written: self.on_data_written, + write_settings: self.write_settings.unwrap_or_default(), }) } } @@ -137,6 +146,7 @@ pub struct DuckDBTableWriter { table_definition: Arc, on_conflict: Option, on_data_written: Option, + write_settings: DuckDBWriteSettings, } impl std::fmt::Debug for DuckDBTableWriter { @@ -153,6 +163,7 @@ impl std::fmt::Debug for DuckDBTableWriter { .as_ref() .map_or("None", |_| "Some(callback)"), ) + .field("write_settings", &self.write_settings) .finish() } } @@ -173,6 +184,11 @@ impl DuckDBTableWriter { self.on_conflict.as_ref() } + #[must_use] + pub fn write_settings(&self) -> &DuckDBWriteSettings { + &self.write_settings + } + #[must_use] pub fn with_on_data_written_handler(mut self, on_data_written: WriteCompletionHandler) -> Self { self.on_data_written = Some(on_data_written); @@ -222,7 +238,8 @@ impl TableProvider for DuckDBTableWriter { overwrite, self.on_conflict.clone(), self.schema(), - ); + ) + .with_write_settings(self.write_settings.clone()); if let Some(handler) = &self.on_data_written { sink = sink.with_on_data_written_handler(Arc::clone(handler)); @@ -239,6 +256,7 @@ pub(crate) struct DuckDBDataSink { on_conflict: Option, schema: SchemaRef, on_data_written: Option, + write_settings: DuckDBWriteSettings, } #[async_trait] @@ -265,6 +283,7 @@ impl DataSink for DuckDBDataSink { let overwrite = self.overwrite; let on_conflict = self.on_conflict.clone(); let on_data_written = self.on_data_written.clone(); + let write_settings = self.write_settings.clone(); // Limit channel size to a maximum of 100 RecordBatches queued for cases when DuckDB is slower than the writer stream, // so that we don't significantly increase memory usage. After the maximum RecordBatches are queued, the writer stream will wait @@ -287,6 +306,7 @@ impl DataSink for DuckDBDataSink { on_data_written.as_ref(), on_commit_transaction, schema, + &write_settings, )?, InsertOp::Append | InsertOp::Replace => insert_append( pool, @@ -296,6 +316,7 @@ impl DataSink for DuckDBDataSink { on_data_written.as_ref(), on_commit_transaction, schema, + &write_settings, )?, }; @@ -379,6 +400,7 @@ impl DuckDBDataSink { on_conflict, schema, on_data_written: None, + write_settings: DuckDBWriteSettings::default(), } } @@ -387,6 +409,12 @@ impl DuckDBDataSink { self.on_data_written = Some(handler); self } + + #[must_use] + pub fn with_write_settings(mut self, write_settings: DuckDBWriteSettings) -> Self { + self.write_settings = write_settings; + self + } } impl std::fmt::Debug for DuckDBDataSink { @@ -401,6 +429,7 @@ impl DisplayAs for DuckDBDataSink { } } +#[allow(clippy::too_many_arguments)] fn insert_append( pool: Arc, table_definition: &Arc, @@ -409,6 +438,7 @@ fn insert_append( on_data_written: Option<&WriteCompletionHandler>, mut on_commit_transaction: tokio::sync::oneshot::Receiver<()>, schema: SchemaRef, + write_settings: &DuckDBWriteSettings, ) -> datafusion::common::Result { let mut db_conn = pool .connect_sync() @@ -466,7 +496,9 @@ fn insert_append( callback(&tx, &append_table, &schema, num_rows)?; } - execute_analyze_sql(&tx, &append_table.table_name().to_string()); + if write_settings.recompute_statistics_on_refresh { + execute_analyze_sql(&tx, &append_table.table_name().to_string()); + } on_commit_transaction .try_recv() @@ -521,6 +553,7 @@ fn insert_append( } #[allow(clippy::too_many_lines)] +#[allow(clippy::too_many_arguments)] fn insert_overwrite( pool: Arc, table_definition: &Arc, @@ -529,6 +562,7 @@ fn insert_overwrite( on_data_written: Option<&WriteCompletionHandler>, mut on_commit_transaction: tokio::sync::oneshot::Receiver<()>, schema: SchemaRef, + write_settings: &DuckDBWriteSettings, ) -> datafusion::common::Result { let cloned_pool = Arc::clone(&pool); let mut db_conn = pool @@ -642,7 +676,9 @@ fn insert_overwrite( callback(&tx, &new_table, &schema, num_rows)?; } - execute_analyze_sql(&tx, &new_table.table_name().to_string()); + if write_settings.recompute_statistics_on_refresh { + execute_analyze_sql(&tx, &new_table.table_name().to_string()); + } tx.commit() .context(super::UnableToCommitTransactionSnafu) @@ -718,7 +754,7 @@ fn write_to_table( /// Errors are logged but do not fail the operation since statistics updates are non-critical. pub fn execute_analyze_sql(tx: &Transaction, table_name: &str) { // DuckDB doesn't support parameterized table names in the ANALYZE statement - let analyze_sql = format!(r#"ANALYZE "{}""#, table_name); + let analyze_sql = format!(r#"ANALYZE "{table_name}""#); tracing::debug!("Executing analyze SQL: {analyze_sql}"); match tx.prepare(&analyze_sql) { Ok(mut stmt) => match stmt.execute([]) { diff --git a/core/src/duckdb/write_settings.rs b/core/src/duckdb/write_settings.rs new file mode 100644 index 00000000..644ffcd4 --- /dev/null +++ b/core/src/duckdb/write_settings.rs @@ -0,0 +1,140 @@ +/* +Copyright 2024-2025 The Spice.ai OSS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +use std::collections::HashMap; + +/// Configuration settings for DuckDB write operations +#[derive(Debug, Clone)] +pub struct DuckDBWriteSettings { + /// Whether to execute ANALYZE statements after data refresh operations + /// to update table statistics for query optimization + pub recompute_statistics_on_refresh: bool, +} + +impl Default for DuckDBWriteSettings { + fn default() -> Self { + Self { + recompute_statistics_on_refresh: true, // Enabled by default for better query performance + } + } +} + +impl DuckDBWriteSettings { + /// Create a new `DuckDBWriteSettings` with default values + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Set whether to recompute statistics on refresh + #[must_use] + pub fn with_recompute_statistics_on_refresh(mut self, enabled: bool) -> Self { + self.recompute_statistics_on_refresh = enabled; + self + } + + /// Parse settings from table creation parameters + #[must_use] + pub fn from_params(params: &HashMap) -> Self { + let mut settings = Self::default(); + + if let Some(value) = params.get("on_refresh_recompute_statistics") { + settings.recompute_statistics_on_refresh = match value.to_lowercase().as_str() { + "true" | "enabled" => true, + "false" | "disabled" => false, + _ => { + tracing::warn!( + "Invalid value for 'on_refresh_recompute_statistics': '{value}'. Expected 'enabled' or 'disabled'. Using default: {}", + settings.recompute_statistics_on_refresh + ); + settings.recompute_statistics_on_refresh + } + }; + } + + settings + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + #[test] + fn test_default_settings() { + let settings = DuckDBWriteSettings::default(); + assert!(settings.recompute_statistics_on_refresh); + } + + #[test] + fn test_new_settings() { + let settings = DuckDBWriteSettings::new(); + assert!(settings.recompute_statistics_on_refresh); + } + + #[test] + fn test_with_recompute_statistics_on_refresh() { + let settings = DuckDBWriteSettings::new().with_recompute_statistics_on_refresh(false); + assert!(!settings.recompute_statistics_on_refresh); + } + + #[test] + fn test_from_params_valid_enabled() { + let mut params = HashMap::new(); + params.insert( + "on_refresh_recompute_statistics".to_string(), + "enabled".to_string(), + ); + + let settings = DuckDBWriteSettings::from_params(¶ms); + assert!(settings.recompute_statistics_on_refresh); + } + + #[test] + fn test_from_params_valid_disabled() { + let mut params = HashMap::new(); + params.insert( + "on_refresh_recompute_statistics".to_string(), + "disabled".to_string(), + ); + + let settings = DuckDBWriteSettings::from_params(¶ms); + assert!(!settings.recompute_statistics_on_refresh); + } + + #[test] + fn test_from_params_invalid_value() { + let mut params = HashMap::new(); + params.insert( + "on_refresh_recompute_statistics".to_string(), + "invalid".to_string(), + ); + + let settings = DuckDBWriteSettings::from_params(¶ms); + // Should fall back to default (true) and log a warning + assert!(settings.recompute_statistics_on_refresh); + } + + #[test] + fn test_from_params_missing_param() { + let params = HashMap::new(); + + let settings = DuckDBWriteSettings::from_params(¶ms); + // Should use default value + assert!(settings.recompute_statistics_on_refresh); + } +} From c08f887e06aad26c171eab921f52c2c3f873ef4f Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Sun, 26 Oct 2025 17:14:12 -0700 Subject: [PATCH 2/3] Update --- core/src/duckdb/write.rs | 4 +-- core/src/duckdb/write_settings.rs | 46 +++++++++++++++---------------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/core/src/duckdb/write.rs b/core/src/duckdb/write.rs index 8224f798..b02b9c25 100644 --- a/core/src/duckdb/write.rs +++ b/core/src/duckdb/write.rs @@ -496,7 +496,7 @@ fn insert_append( callback(&tx, &append_table, &schema, num_rows)?; } - if write_settings.recompute_statistics_on_refresh { + if write_settings.recompute_statistics_on_write { execute_analyze_sql(&tx, &append_table.table_name().to_string()); } @@ -676,7 +676,7 @@ fn insert_overwrite( callback(&tx, &new_table, &schema, num_rows)?; } - if write_settings.recompute_statistics_on_refresh { + if write_settings.recompute_statistics_on_write { execute_analyze_sql(&tx, &new_table.table_name().to_string()); } diff --git a/core/src/duckdb/write_settings.rs b/core/src/duckdb/write_settings.rs index 644ffcd4..a409fee7 100644 --- a/core/src/duckdb/write_settings.rs +++ b/core/src/duckdb/write_settings.rs @@ -19,15 +19,15 @@ use std::collections::HashMap; /// Configuration settings for DuckDB write operations #[derive(Debug, Clone)] pub struct DuckDBWriteSettings { - /// Whether to execute ANALYZE statements after data refresh operations + /// Whether to execute ANALYZE statements after data write operations /// to update table statistics for query optimization - pub recompute_statistics_on_refresh: bool, + pub recompute_statistics_on_write: bool, } impl Default for DuckDBWriteSettings { fn default() -> Self { Self { - recompute_statistics_on_refresh: true, // Enabled by default for better query performance + recompute_statistics_on_write: true, // Enabled by default for better query performance } } } @@ -39,10 +39,10 @@ impl DuckDBWriteSettings { Self::default() } - /// Set whether to recompute statistics on refresh + /// Set whether to recompute statistics on write #[must_use] - pub fn with_recompute_statistics_on_refresh(mut self, enabled: bool) -> Self { - self.recompute_statistics_on_refresh = enabled; + pub fn with_recompute_statistics_on_write(mut self, enabled: bool) -> Self { + self.recompute_statistics_on_write = enabled; self } @@ -51,16 +51,16 @@ impl DuckDBWriteSettings { pub fn from_params(params: &HashMap) -> Self { let mut settings = Self::default(); - if let Some(value) = params.get("on_refresh_recompute_statistics") { - settings.recompute_statistics_on_refresh = match value.to_lowercase().as_str() { + if let Some(value) = params.get("recompute_statistics_on_write") { + settings.recompute_statistics_on_write = match value.to_lowercase().as_str() { "true" | "enabled" => true, "false" | "disabled" => false, _ => { tracing::warn!( - "Invalid value for 'on_refresh_recompute_statistics': '{value}'. Expected 'enabled' or 'disabled'. Using default: {}", - settings.recompute_statistics_on_refresh + "Invalid value for recompute statistics on write parameter: '{value}'. Expected 'enabled' or 'disabled'. Using default: {}", + settings.recompute_statistics_on_write ); - settings.recompute_statistics_on_refresh + settings.recompute_statistics_on_write } }; } @@ -77,56 +77,56 @@ mod tests { #[test] fn test_default_settings() { let settings = DuckDBWriteSettings::default(); - assert!(settings.recompute_statistics_on_refresh); + assert!(settings.recompute_statistics_on_write); } #[test] fn test_new_settings() { let settings = DuckDBWriteSettings::new(); - assert!(settings.recompute_statistics_on_refresh); + assert!(settings.recompute_statistics_on_write); } #[test] - fn test_with_recompute_statistics_on_refresh() { - let settings = DuckDBWriteSettings::new().with_recompute_statistics_on_refresh(false); - assert!(!settings.recompute_statistics_on_refresh); + fn test_with_recompute_statistics_on_write() { + let settings = DuckDBWriteSettings::new().with_recompute_statistics_on_write(false); + assert!(!settings.recompute_statistics_on_write); } #[test] fn test_from_params_valid_enabled() { let mut params = HashMap::new(); params.insert( - "on_refresh_recompute_statistics".to_string(), + "recompute_statistics_on_write".to_string(), "enabled".to_string(), ); let settings = DuckDBWriteSettings::from_params(¶ms); - assert!(settings.recompute_statistics_on_refresh); + assert!(settings.recompute_statistics_on_write); } #[test] fn test_from_params_valid_disabled() { let mut params = HashMap::new(); params.insert( - "on_refresh_recompute_statistics".to_string(), + "recompute_statistics_on_write".to_string(), "disabled".to_string(), ); let settings = DuckDBWriteSettings::from_params(¶ms); - assert!(!settings.recompute_statistics_on_refresh); + assert!(!settings.recompute_statistics_on_write); } #[test] fn test_from_params_invalid_value() { let mut params = HashMap::new(); params.insert( - "on_refresh_recompute_statistics".to_string(), + "recompute_statistics_on_write".to_string(), "invalid".to_string(), ); let settings = DuckDBWriteSettings::from_params(¶ms); // Should fall back to default (true) and log a warning - assert!(settings.recompute_statistics_on_refresh); + assert!(settings.recompute_statistics_on_write); } #[test] @@ -135,6 +135,6 @@ mod tests { let settings = DuckDBWriteSettings::from_params(¶ms); // Should use default value - assert!(settings.recompute_statistics_on_refresh); + assert!(settings.recompute_statistics_on_write); } } From 410b812c2860be532f0d4298b115f20b1274c697 Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Sun, 26 Oct 2025 18:42:09 -0700 Subject: [PATCH 3/3] Update core/src/duckdb/write_settings.rs Co-authored-by: Phillip LeBlanc Signed-off-by: Sergei Grebnov --- core/src/duckdb/write_settings.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/core/src/duckdb/write_settings.rs b/core/src/duckdb/write_settings.rs index a409fee7..5a368d8b 100644 --- a/core/src/duckdb/write_settings.rs +++ b/core/src/duckdb/write_settings.rs @@ -1,19 +1,3 @@ -/* -Copyright 2024-2025 The Spice.ai OSS Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - use std::collections::HashMap; /// Configuration settings for DuckDB write operations