Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/duckdb.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::duckdb::write_settings::DuckDBWriteSettings;
use crate::sql::sql_provider_datafusion;
use crate::util::{
self,
Expand Down Expand Up @@ -51,6 +52,7 @@ mod creator;
mod settings;
mod sql_table;
pub mod write;
pub mod write_settings;
pub use creator::{RelationName, TableDefinition, TableManager, ViewCreator};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -447,10 +449,13 @@ impl TableProviderFactory for DuckDBTableProviderFactory {
let pool = Arc::new(pool);
make_initial_table(Arc::clone(&table_definition), &pool)?;

let write_settings = DuckDBWriteSettings::from_params(&options);

let table_writer_builder = DuckDBTableWriterBuilder::new()
.with_table_definition(Arc::clone(&table_definition))
.with_pool(pool)
.set_on_conflict(on_conflict);
.set_on_conflict(on_conflict)
.with_write_settings(write_settings);

let dyn_pool: Arc<DynDuckDbConnectionPool> = Arc::new(read_pool);

Expand Down
44 changes: 40 additions & 4 deletions core/src/duckdb/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tokio::task::JoinHandle;

use super::creator::{TableDefinition, TableManager, ViewCreator};
use super::{to_datafusion_error, RelationName};
use super::write_settings::DuckDBWriteSettings;

/// A callback handler that is invoked after data has been successfully written to a DuckDB table
/// but before the transaction is committed.
Expand Down Expand Up @@ -61,6 +62,7 @@ pub struct DuckDBTableWriterBuilder {
on_conflict: Option<OnConflict>,
table_definition: Option<Arc<TableDefinition>>,
on_data_written: Option<WriteCompletionHandler>,
write_settings: Option<DuckDBWriteSettings>,
}

impl DuckDBTableWriterBuilder {
Expand Down Expand Up @@ -99,6 +101,12 @@ impl DuckDBTableWriterBuilder {
self
}

#[must_use]
pub fn with_write_settings(mut self, write_settings: DuckDBWriteSettings) -> Self {
self.write_settings = Some(write_settings);
self
}

/// Builds a `DuckDBTableWriter` from the provided configuration.
///
/// # Errors
Expand Down Expand Up @@ -126,6 +134,7 @@ impl DuckDBTableWriterBuilder {
table_definition,
pool,
on_data_written: self.on_data_written,
write_settings: self.write_settings.unwrap_or_default(),
})
}
}
Expand All @@ -137,6 +146,7 @@ pub struct DuckDBTableWriter {
table_definition: Arc<TableDefinition>,
on_conflict: Option<OnConflict>,
on_data_written: Option<WriteCompletionHandler>,
write_settings: DuckDBWriteSettings,
}

impl std::fmt::Debug for DuckDBTableWriter {
Expand All @@ -153,6 +163,7 @@ impl std::fmt::Debug for DuckDBTableWriter {
.as_ref()
.map_or("None", |_| "Some(callback)"),
)
.field("write_settings", &self.write_settings)
.finish()
}
}
Expand All @@ -173,6 +184,11 @@ impl DuckDBTableWriter {
self.on_conflict.as_ref()
}

#[must_use]
pub fn write_settings(&self) -> &DuckDBWriteSettings {
&self.write_settings
}

#[must_use]
pub fn with_on_data_written_handler(mut self, on_data_written: WriteCompletionHandler) -> Self {
self.on_data_written = Some(on_data_written);
Expand Down Expand Up @@ -222,7 +238,8 @@ impl TableProvider for DuckDBTableWriter {
overwrite,
self.on_conflict.clone(),
self.schema(),
);
)
.with_write_settings(self.write_settings.clone());

if let Some(handler) = &self.on_data_written {
sink = sink.with_on_data_written_handler(Arc::clone(handler));
Expand All @@ -239,6 +256,7 @@ pub(crate) struct DuckDBDataSink {
on_conflict: Option<OnConflict>,
schema: SchemaRef,
on_data_written: Option<WriteCompletionHandler>,
write_settings: DuckDBWriteSettings,
}

#[async_trait]
Expand All @@ -265,6 +283,7 @@ impl DataSink for DuckDBDataSink {
let overwrite = self.overwrite;
let on_conflict = self.on_conflict.clone();
let on_data_written = self.on_data_written.clone();
let write_settings = self.write_settings.clone();

// Limit channel size to a maximum of 100 RecordBatches queued for cases when DuckDB is slower than the writer stream,
// so that we don't significantly increase memory usage. After the maximum RecordBatches are queued, the writer stream will wait
Expand All @@ -287,6 +306,7 @@ impl DataSink for DuckDBDataSink {
on_data_written.as_ref(),
on_commit_transaction,
schema,
&write_settings,
)?,
InsertOp::Append | InsertOp::Replace => insert_append(
pool,
Expand All @@ -296,6 +316,7 @@ impl DataSink for DuckDBDataSink {
on_data_written.as_ref(),
on_commit_transaction,
schema,
&write_settings,
)?,
};

Expand Down Expand Up @@ -379,6 +400,7 @@ impl DuckDBDataSink {
on_conflict,
schema,
on_data_written: None,
write_settings: DuckDBWriteSettings::default(),
}
}

Expand All @@ -387,6 +409,12 @@ impl DuckDBDataSink {
self.on_data_written = Some(handler);
self
}

#[must_use]
pub fn with_write_settings(mut self, write_settings: DuckDBWriteSettings) -> Self {
self.write_settings = write_settings;
self
}
}

impl std::fmt::Debug for DuckDBDataSink {
Expand All @@ -401,6 +429,7 @@ impl DisplayAs for DuckDBDataSink {
}
}

#[allow(clippy::too_many_arguments)]
fn insert_append(
pool: Arc<DuckDbConnectionPool>,
table_definition: &Arc<TableDefinition>,
Expand All @@ -409,6 +438,7 @@ fn insert_append(
on_data_written: Option<&WriteCompletionHandler>,
mut on_commit_transaction: tokio::sync::oneshot::Receiver<()>,
schema: SchemaRef,
write_settings: &DuckDBWriteSettings,
) -> datafusion::common::Result<u64> {
let mut db_conn = pool
.connect_sync()
Expand Down Expand Up @@ -466,7 +496,9 @@ fn insert_append(
callback(&tx, &append_table, &schema, num_rows)?;
}

execute_analyze_sql(&tx, &append_table.table_name().to_string());
if write_settings.recompute_statistics_on_write {
execute_analyze_sql(&tx, &append_table.table_name().to_string());
}

on_commit_transaction
.try_recv()
Expand Down Expand Up @@ -521,6 +553,7 @@ fn insert_append(
}

#[allow(clippy::too_many_lines)]
#[allow(clippy::too_many_arguments)]
fn insert_overwrite(
pool: Arc<DuckDbConnectionPool>,
table_definition: &Arc<TableDefinition>,
Expand All @@ -529,6 +562,7 @@ fn insert_overwrite(
on_data_written: Option<&WriteCompletionHandler>,
mut on_commit_transaction: tokio::sync::oneshot::Receiver<()>,
schema: SchemaRef,
write_settings: &DuckDBWriteSettings,
) -> datafusion::common::Result<u64> {
let cloned_pool = Arc::clone(&pool);
let mut db_conn = pool
Expand Down Expand Up @@ -642,7 +676,9 @@ fn insert_overwrite(
callback(&tx, &new_table, &schema, num_rows)?;
}

execute_analyze_sql(&tx, &new_table.table_name().to_string());
if write_settings.recompute_statistics_on_write {
execute_analyze_sql(&tx, &new_table.table_name().to_string());
}

tx.commit()
.context(super::UnableToCommitTransactionSnafu)
Expand Down Expand Up @@ -718,7 +754,7 @@ fn write_to_table(
/// Errors are logged but do not fail the operation since statistics updates are non-critical.
pub fn execute_analyze_sql(tx: &Transaction, table_name: &str) {
// DuckDB doesn't support parameterized table names in the ANALYZE statement
let analyze_sql = format!(r#"ANALYZE "{}""#, table_name);
let analyze_sql = format!(r#"ANALYZE "{table_name}""#);
tracing::debug!("Executing analyze SQL: {analyze_sql}");
match tx.prepare(&analyze_sql) {
Ok(mut stmt) => match stmt.execute([]) {
Expand Down
124 changes: 124 additions & 0 deletions core/src/duckdb/write_settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::collections::HashMap;

/// Configuration settings for DuckDB write operations
#[derive(Debug, Clone)]
pub struct DuckDBWriteSettings {
/// Whether to execute ANALYZE statements after data write operations
/// to update table statistics for query optimization
pub recompute_statistics_on_write: bool,
}

impl Default for DuckDBWriteSettings {
fn default() -> Self {
Self {
recompute_statistics_on_write: true, // Enabled by default for better query performance
}
}
}

impl DuckDBWriteSettings {
/// Create a new `DuckDBWriteSettings` with default values
#[must_use]
pub fn new() -> Self {
Self::default()
}

/// Set whether to recompute statistics on write
#[must_use]
pub fn with_recompute_statistics_on_write(mut self, enabled: bool) -> Self {
self.recompute_statistics_on_write = enabled;
self
}

/// Parse settings from table creation parameters
#[must_use]
pub fn from_params(params: &HashMap<String, String>) -> Self {
let mut settings = Self::default();

if let Some(value) = params.get("recompute_statistics_on_write") {
settings.recompute_statistics_on_write = match value.to_lowercase().as_str() {
"true" | "enabled" => true,
"false" | "disabled" => false,
_ => {
tracing::warn!(
"Invalid value for recompute statistics on write parameter: '{value}'. Expected 'enabled' or 'disabled'. Using default: {}",
settings.recompute_statistics_on_write
);
settings.recompute_statistics_on_write
}
};
}

settings
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;

#[test]
fn test_default_settings() {
let settings = DuckDBWriteSettings::default();
assert!(settings.recompute_statistics_on_write);
}

#[test]
fn test_new_settings() {
let settings = DuckDBWriteSettings::new();
assert!(settings.recompute_statistics_on_write);
}

#[test]
fn test_with_recompute_statistics_on_write() {
let settings = DuckDBWriteSettings::new().with_recompute_statistics_on_write(false);
assert!(!settings.recompute_statistics_on_write);
}

#[test]
fn test_from_params_valid_enabled() {
let mut params = HashMap::new();
params.insert(
"recompute_statistics_on_write".to_string(),
"enabled".to_string(),
);

let settings = DuckDBWriteSettings::from_params(&params);
assert!(settings.recompute_statistics_on_write);
}

#[test]
fn test_from_params_valid_disabled() {
let mut params = HashMap::new();
params.insert(
"recompute_statistics_on_write".to_string(),
"disabled".to_string(),
);

let settings = DuckDBWriteSettings::from_params(&params);
assert!(!settings.recompute_statistics_on_write);
}

#[test]
fn test_from_params_invalid_value() {
let mut params = HashMap::new();
params.insert(
"recompute_statistics_on_write".to_string(),
"invalid".to_string(),
);

let settings = DuckDBWriteSettings::from_params(&params);
// Should fall back to default (true) and log a warning
assert!(settings.recompute_statistics_on_write);
}

#[test]
fn test_from_params_missing_param() {
let params = HashMap::new();

let settings = DuckDBWriteSettings::from_params(&params);
// Should use default value
assert!(settings.recompute_statistics_on_write);
}
}