Skip to content

Commit 3245881

Browse files
committed
DuckDB: add support for on_refresh_recompute_statistics param
1 parent 8a1e010 commit 3245881

File tree

3 files changed

+185
-4
lines changed

3 files changed

+185
-4
lines changed

core/src/duckdb.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::duckdb::write_settings::DuckDBWriteSettings;
12
use crate::sql::sql_provider_datafusion;
23
use crate::util::{
34
self,
@@ -51,6 +52,7 @@ mod creator;
5152
mod settings;
5253
mod sql_table;
5354
pub mod write;
55+
pub mod write_settings;
5456
pub use creator::{RelationName, TableDefinition, TableManager, ViewCreator};
5557

5658
#[derive(Debug, Snafu)]
@@ -447,10 +449,13 @@ impl TableProviderFactory for DuckDBTableProviderFactory {
447449
let pool = Arc::new(pool);
448450
make_initial_table(Arc::clone(&table_definition), &pool)?;
449451

452+
let write_settings = DuckDBWriteSettings::from_params(&options);
453+
450454
let table_writer_builder = DuckDBTableWriterBuilder::new()
451455
.with_table_definition(Arc::clone(&table_definition))
452456
.with_pool(pool)
453-
.set_on_conflict(on_conflict);
457+
.set_on_conflict(on_conflict)
458+
.with_write_settings(write_settings);
454459

455460
let dyn_pool: Arc<DynDuckDbConnectionPool> = Arc::new(read_pool);
456461

core/src/duckdb/write.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use tokio::task::JoinHandle;
3333

3434
use super::creator::{TableDefinition, TableManager, ViewCreator};
3535
use super::{to_datafusion_error, RelationName};
36+
use super::write_settings::DuckDBWriteSettings;
3637

3738
/// A callback handler that is invoked after data has been successfully written to a DuckDB table
3839
/// but before the transaction is committed.
@@ -61,6 +62,7 @@ pub struct DuckDBTableWriterBuilder {
6162
on_conflict: Option<OnConflict>,
6263
table_definition: Option<Arc<TableDefinition>>,
6364
on_data_written: Option<WriteCompletionHandler>,
65+
write_settings: Option<DuckDBWriteSettings>,
6466
}
6567

6668
impl DuckDBTableWriterBuilder {
@@ -99,6 +101,12 @@ impl DuckDBTableWriterBuilder {
99101
self
100102
}
101103

104+
#[must_use]
105+
pub fn with_write_settings(mut self, write_settings: DuckDBWriteSettings) -> Self {
106+
self.write_settings = Some(write_settings);
107+
self
108+
}
109+
102110
/// Builds a `DuckDBTableWriter` from the provided configuration.
103111
///
104112
/// # Errors
@@ -126,6 +134,7 @@ impl DuckDBTableWriterBuilder {
126134
table_definition,
127135
pool,
128136
on_data_written: self.on_data_written,
137+
write_settings: self.write_settings.unwrap_or_default(),
129138
})
130139
}
131140
}
@@ -137,6 +146,7 @@ pub struct DuckDBTableWriter {
137146
table_definition: Arc<TableDefinition>,
138147
on_conflict: Option<OnConflict>,
139148
on_data_written: Option<WriteCompletionHandler>,
149+
write_settings: DuckDBWriteSettings,
140150
}
141151

142152
impl std::fmt::Debug for DuckDBTableWriter {
@@ -153,6 +163,7 @@ impl std::fmt::Debug for DuckDBTableWriter {
153163
.as_ref()
154164
.map_or("None", |_| "Some(callback)"),
155165
)
166+
.field("write_settings", &self.write_settings)
156167
.finish()
157168
}
158169
}
@@ -173,6 +184,11 @@ impl DuckDBTableWriter {
173184
self.on_conflict.as_ref()
174185
}
175186

187+
#[must_use]
188+
pub fn write_settings(&self) -> &DuckDBWriteSettings {
189+
&self.write_settings
190+
}
191+
176192
#[must_use]
177193
pub fn with_on_data_written_handler(mut self, on_data_written: WriteCompletionHandler) -> Self {
178194
self.on_data_written = Some(on_data_written);
@@ -222,7 +238,8 @@ impl TableProvider for DuckDBTableWriter {
222238
overwrite,
223239
self.on_conflict.clone(),
224240
self.schema(),
225-
);
241+
)
242+
.with_write_settings(self.write_settings.clone());
226243

227244
if let Some(handler) = &self.on_data_written {
228245
sink = sink.with_on_data_written_handler(Arc::clone(handler));
@@ -239,6 +256,7 @@ pub(crate) struct DuckDBDataSink {
239256
on_conflict: Option<OnConflict>,
240257
schema: SchemaRef,
241258
on_data_written: Option<WriteCompletionHandler>,
259+
write_settings: DuckDBWriteSettings,
242260
}
243261

244262
#[async_trait]
@@ -265,6 +283,7 @@ impl DataSink for DuckDBDataSink {
265283
let overwrite = self.overwrite;
266284
let on_conflict = self.on_conflict.clone();
267285
let on_data_written = self.on_data_written.clone();
286+
let write_settings = self.write_settings.clone();
268287

269288
// Limit channel size to a maximum of 100 RecordBatches queued for cases when DuckDB is slower than the writer stream,
270289
// so that we don't significantly increase memory usage. After the maximum RecordBatches are queued, the writer stream will wait
@@ -287,6 +306,7 @@ impl DataSink for DuckDBDataSink {
287306
on_data_written.as_ref(),
288307
on_commit_transaction,
289308
schema,
309+
&write_settings,
290310
)?,
291311
InsertOp::Append | InsertOp::Replace => insert_append(
292312
pool,
@@ -296,6 +316,7 @@ impl DataSink for DuckDBDataSink {
296316
on_data_written.as_ref(),
297317
on_commit_transaction,
298318
schema,
319+
&write_settings,
299320
)?,
300321
};
301322

@@ -379,6 +400,7 @@ impl DuckDBDataSink {
379400
on_conflict,
380401
schema,
381402
on_data_written: None,
403+
write_settings: DuckDBWriteSettings::default(),
382404
}
383405
}
384406

@@ -387,6 +409,12 @@ impl DuckDBDataSink {
387409
self.on_data_written = Some(handler);
388410
self
389411
}
412+
413+
#[must_use]
414+
pub fn with_write_settings(mut self, write_settings: DuckDBWriteSettings) -> Self {
415+
self.write_settings = write_settings;
416+
self
417+
}
390418
}
391419

392420
impl std::fmt::Debug for DuckDBDataSink {
@@ -401,6 +429,7 @@ impl DisplayAs for DuckDBDataSink {
401429
}
402430
}
403431

432+
#[allow(clippy::too_many_arguments)]
404433
fn insert_append(
405434
pool: Arc<DuckDbConnectionPool>,
406435
table_definition: &Arc<TableDefinition>,
@@ -409,6 +438,7 @@ fn insert_append(
409438
on_data_written: Option<&WriteCompletionHandler>,
410439
mut on_commit_transaction: tokio::sync::oneshot::Receiver<()>,
411440
schema: SchemaRef,
441+
write_settings: &DuckDBWriteSettings,
412442
) -> datafusion::common::Result<u64> {
413443
let mut db_conn = pool
414444
.connect_sync()
@@ -466,7 +496,9 @@ fn insert_append(
466496
callback(&tx, &append_table, &schema, num_rows)?;
467497
}
468498

469-
execute_analyze_sql(&tx, &append_table.table_name().to_string());
499+
if write_settings.recompute_statistics_on_refresh {
500+
execute_analyze_sql(&tx, &append_table.table_name().to_string());
501+
}
470502

471503
on_commit_transaction
472504
.try_recv()
@@ -521,6 +553,7 @@ fn insert_append(
521553
}
522554

523555
#[allow(clippy::too_many_lines)]
556+
#[allow(clippy::too_many_arguments)]
524557
fn insert_overwrite(
525558
pool: Arc<DuckDbConnectionPool>,
526559
table_definition: &Arc<TableDefinition>,
@@ -529,6 +562,7 @@ fn insert_overwrite(
529562
on_data_written: Option<&WriteCompletionHandler>,
530563
mut on_commit_transaction: tokio::sync::oneshot::Receiver<()>,
531564
schema: SchemaRef,
565+
write_settings: &DuckDBWriteSettings,
532566
) -> datafusion::common::Result<u64> {
533567
let cloned_pool = Arc::clone(&pool);
534568
let mut db_conn = pool
@@ -642,7 +676,9 @@ fn insert_overwrite(
642676
callback(&tx, &new_table, &schema, num_rows)?;
643677
}
644678

645-
execute_analyze_sql(&tx, &new_table.table_name().to_string());
679+
if write_settings.recompute_statistics_on_refresh {
680+
execute_analyze_sql(&tx, &new_table.table_name().to_string());
681+
}
646682

647683
tx.commit()
648684
.context(super::UnableToCommitTransactionSnafu)

core/src/duckdb/write_settings.rs

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
Copyright 2024-2025 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
use std::collections::HashMap;
18+
19+
/// Configuration settings for DuckDB write operations
20+
#[derive(Debug, Clone)]
21+
pub struct DuckDBWriteSettings {
22+
/// Whether to execute ANALYZE statements after data refresh operations
23+
/// to update table statistics for query optimization
24+
pub recompute_statistics_on_refresh: bool,
25+
}
26+
27+
impl Default for DuckDBWriteSettings {
28+
fn default() -> Self {
29+
Self {
30+
recompute_statistics_on_refresh: true, // Enabled by default for better query performance
31+
}
32+
}
33+
}
34+
35+
impl DuckDBWriteSettings {
36+
/// Create a new `DuckDBWriteSettings` with default values
37+
#[must_use]
38+
pub fn new() -> Self {
39+
Self::default()
40+
}
41+
42+
/// Set whether to recompute statistics on refresh
43+
#[must_use]
44+
pub fn with_recompute_statistics_on_refresh(mut self, enabled: bool) -> Self {
45+
self.recompute_statistics_on_refresh = enabled;
46+
self
47+
}
48+
49+
/// Parse settings from table creation parameters
50+
#[must_use]
51+
pub fn from_params(params: &HashMap<String, String>) -> Self {
52+
let mut settings = Self::default();
53+
54+
if let Some(value) = params.get("on_refresh_recompute_statistics") {
55+
settings.recompute_statistics_on_refresh = match value.to_lowercase().as_str() {
56+
"true" | "enabled" => true,
57+
"false" | "disabled" => false,
58+
_ => {
59+
tracing::warn!(
60+
"Invalid value for 'on_refresh_recompute_statistics': '{value}'. Expected 'enabled' or 'disabled'. Using default: {}",
61+
settings.recompute_statistics_on_refresh
62+
);
63+
settings.recompute_statistics_on_refresh
64+
}
65+
};
66+
}
67+
68+
settings
69+
}
70+
}
71+
72+
#[cfg(test)]
73+
mod tests {
74+
use super::*;
75+
use std::collections::HashMap;
76+
77+
#[test]
78+
fn test_default_settings() {
79+
let settings = DuckDBWriteSettings::default();
80+
assert!(settings.recompute_statistics_on_refresh);
81+
}
82+
83+
#[test]
84+
fn test_new_settings() {
85+
let settings = DuckDBWriteSettings::new();
86+
assert!(settings.recompute_statistics_on_refresh);
87+
}
88+
89+
#[test]
90+
fn test_with_recompute_statistics_on_refresh() {
91+
let settings = DuckDBWriteSettings::new().with_recompute_statistics_on_refresh(false);
92+
assert!(!settings.recompute_statistics_on_refresh);
93+
}
94+
95+
#[test]
96+
fn test_from_params_valid_enabled() {
97+
let mut params = HashMap::new();
98+
params.insert(
99+
"on_refresh_recompute_statistics".to_string(),
100+
"enabled".to_string(),
101+
);
102+
103+
let settings = DuckDBWriteSettings::from_params(&params);
104+
assert!(settings.recompute_statistics_on_refresh);
105+
}
106+
107+
#[test]
108+
fn test_from_params_valid_disabled() {
109+
let mut params = HashMap::new();
110+
params.insert(
111+
"on_refresh_recompute_statistics".to_string(),
112+
"disabled".to_string(),
113+
);
114+
115+
let settings = DuckDBWriteSettings::from_params(&params);
116+
assert!(!settings.recompute_statistics_on_refresh);
117+
}
118+
119+
#[test]
120+
fn test_from_params_invalid_value() {
121+
let mut params = HashMap::new();
122+
params.insert(
123+
"on_refresh_recompute_statistics".to_string(),
124+
"invalid".to_string(),
125+
);
126+
127+
let settings = DuckDBWriteSettings::from_params(&params);
128+
// Should fall back to default (true) and log a warning
129+
assert!(settings.recompute_statistics_on_refresh);
130+
}
131+
132+
#[test]
133+
fn test_from_params_missing_param() {
134+
let params = HashMap::new();
135+
136+
let settings = DuckDBWriteSettings::from_params(&params);
137+
// Should use default value
138+
assert!(settings.recompute_statistics_on_refresh);
139+
}
140+
}

0 commit comments

Comments
 (0)