Skip to content

Commit 9ed6f1e

Browse files
committed
Support retries when writing data to SQLite
1 parent b0af919 commit 9ed6f1e

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

src/sqlite/write.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use futures::StreamExt;
1919
use snafu::prelude::*;
2020

2121
use crate::util::{
22-
constraints, on_conflict::OnConflict, retriable_error::check_and_mark_retriable_error,
22+
constraints,
23+
on_conflict::OnConflict,
24+
retriable_error::{check_and_mark_retriable_error, to_retriable_data_write_error},
2325
};
2426

2527
use super::{to_datafusion_error, Sqlite};
@@ -126,8 +128,13 @@ impl DataSink for SqliteDataSink {
126128
let (notify_commit_transaction, mut on_commit_transaction) =
127129
tokio::sync::oneshot::channel();
128130

129-
let mut db_conn = self.sqlite.connect().await.map_err(to_datafusion_error)?;
130-
let sqlite_conn = Sqlite::sqlite_conn(&mut db_conn).map_err(to_datafusion_error)?;
131+
let mut db_conn = self
132+
.sqlite
133+
.connect()
134+
.await
135+
.map_err(to_retriable_data_write_error)?;
136+
let sqlite_conn =
137+
Sqlite::sqlite_conn(&mut db_conn).map_err(to_retriable_data_write_error)?;
131138

132139
let constraints = self.sqlite.constraints().clone();
133140
let mut data = data;
@@ -191,11 +198,9 @@ impl DataSink for SqliteDataSink {
191198
})
192199
.await
193200
.context(super::UnableToInsertIntoTableAsyncSnafu)
194-
.map_err(to_datafusion_error)?;
201+
.map_err(to_retriable_data_write_error)?;
195202

196-
let num_rows = task.await.map_err(|err| {
197-
DataFusionError::Execution(format!("Error sending data batch: {err}"))
198-
})??;
203+
let num_rows = task.await.map_err(to_retriable_data_write_error)??;
199204

200205
Ok(num_rows)
201206
}

src/util/retriable_error.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::error::Error;
2+
13
use datafusion::error::DataFusionError;
24
use snafu::Snafu;
35

@@ -7,6 +9,11 @@ pub enum RetriableError {
79
DataRetrievalError {
810
source: datafusion::error::DataFusionError,
911
},
12+
13+
#[snafu(display("{source}"))]
14+
DataWriteError {
15+
source: Box<dyn Error + Send + Sync>
16+
},
1017
}
1118

1219
#[must_use]
@@ -36,6 +43,15 @@ pub fn check_and_mark_retriable_error(err: DataFusionError) -> DataFusionError {
3643
DataFusionError::External(Box::new(RetriableError::DataRetrievalError { source: err }))
3744
}
3845

46+
// Wraps error as `RetriableError::DataWriteError` so we can detect this error and retry later at a higher level
47+
#[must_use]
48+
pub fn to_retriable_data_write_error<E>(error: E) -> DataFusionError
49+
where
50+
E: Error + Send + Sync + 'static,
51+
{
52+
DataFusionError::External(Box::new(RetriableError::DataWriteError { source: error.into() }))
53+
}
54+
3955
fn is_invalid_query_error(error: &DataFusionError) -> bool {
4056
match error {
4157
DataFusionError::Context(_, err) => is_invalid_query_error(err.as_ref()),

0 commit comments

Comments
 (0)