Skip to content

Commit 6e16d6d

Browse files
committed
feat: Add DuckDB checks for unsupported column types
1 parent df4bab1 commit 6e16d6d

File tree

5 files changed

+322
-19
lines changed

5 files changed

+322
-19
lines changed

src/duckdb.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,3 @@
1-
use crate::sql::db_connection_pool::{
2-
self,
3-
dbconnection::{
4-
duckdbconn::{
5-
flatten_table_function_name, is_table_function, DuckDBParameter, DuckDbConnection,
6-
},
7-
get_schema, DbConnection,
8-
},
9-
duckdbpool::DuckDbConnectionPool,
10-
DbConnectionPool, DbInstanceKey, Mode,
11-
};
121
use crate::sql::sql_provider_datafusion;
132
use crate::util::{
143
self,
@@ -17,6 +6,20 @@ use crate::util::{
176
indexes::IndexType,
187
on_conflict::{self, OnConflict},
198
};
9+
use crate::{
10+
sql::db_connection_pool::{
11+
self,
12+
dbconnection::{
13+
duckdbconn::{
14+
flatten_table_function_name, is_table_function, DuckDBParameter, DuckDbConnection,
15+
},
16+
get_schema, DbConnection,
17+
},
18+
duckdbpool::DuckDbConnectionPool,
19+
DbConnectionPool, DbInstanceKey, Mode,
20+
},
21+
InvalidTypeAction,
22+
};
2023
use arrow::{array::RecordBatch, datatypes::SchemaRef};
2124
use async_trait::async_trait;
2225
use datafusion::{
@@ -120,6 +123,7 @@ type Result<T, E = Error> = std::result::Result<T, E>;
120123
pub struct DuckDBTableProviderFactory {
121124
access_mode: AccessMode,
122125
instances: Arc<Mutex<HashMap<DbInstanceKey, DuckDbConnectionPool>>>,
126+
invalid_type_action: InvalidTypeAction,
123127
}
124128

125129
const DUCKDB_DB_PATH_PARAM: &str = "open";
@@ -132,9 +136,16 @@ impl DuckDBTableProviderFactory {
132136
Self {
133137
access_mode,
134138
instances: Arc::new(Mutex::new(HashMap::new())),
139+
invalid_type_action: InvalidTypeAction::Error,
135140
}
136141
}
137142

143+
#[must_use]
144+
pub fn with_invalid_type_action(mut self, invalid_type_action: InvalidTypeAction) -> Self {
145+
self.invalid_type_action = invalid_type_action;
146+
self
147+
}
148+
138149
#[must_use]
139150
pub fn attach_databases(&self, options: &HashMap<String, String>) -> Vec<Arc<str>> {
140151
options
@@ -181,7 +192,9 @@ impl DuckDBTableProviderFactory {
181192
return Ok(instance.clone());
182193
}
183194

184-
let pool = DuckDbConnectionPool::new_memory().context(DbConnectionPoolSnafu)?;
195+
let pool = DuckDbConnectionPool::new_memory()
196+
.context(DbConnectionPoolSnafu)?
197+
.with_invalid_type_action(self.invalid_type_action.clone());
185198

186199
instances.insert(key, pool.clone());
187200

@@ -201,7 +214,8 @@ impl DuckDBTableProviderFactory {
201214
}
202215

203216
let pool = DuckDbConnectionPool::new_file(&db_path, &self.access_mode)
204-
.context(DbConnectionPoolSnafu)?;
217+
.context(DbConnectionPoolSnafu)?
218+
.with_invalid_type_action(self.invalid_type_action.clone());
205219

206220
instances.insert(key, pool.clone());
207221

src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use serde::{Deserialize, Serialize};
12
use snafu::prelude::*;
23

34
pub mod sql;
@@ -23,3 +24,11 @@ pub enum Error {
2324
#[snafu(display("Error reading file: {source}"))]
2425
FileReadError { source: std::io::Error },
2526
}
27+
28+
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
29+
#[serde(rename_all = "lowercase")]
30+
pub enum InvalidTypeAction {
31+
Error,
32+
Warn,
33+
Ignore,
34+
}

src/sql/db_connection_pool/dbconnection.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{any::Any, sync::Arc};
22

3+
use arrow_schema::DataType;
34
use datafusion::{
45
arrow::datatypes::SchemaRef, execution::SendableRecordBatchStream, sql::TableReference,
56
};
@@ -25,6 +26,12 @@ pub enum Error {
2526
#[snafu(display("Unable to get schema: {source}"))]
2627
UnableToGetSchema { source: GenericError },
2728

29+
#[snafu(display("The field '{field_name}' has an unsupported data type: {data_type}"))]
30+
UnsupportedDataType {
31+
data_type: DataType,
32+
field_name: String,
33+
},
34+
2835
#[snafu(display("Unable to query arrow: {source}"))]
2936
UnableToQueryArrow { source: GenericError },
3037

0 commit comments

Comments
 (0)