Skip to content

Commit 0617e24

Browse files
committed
feat: shared_connection_into_cursor allows cursors to share ownership
of the same connection across threads.
1 parent 566ad26 commit 0617e24

File tree

4 files changed

+96
-5
lines changed

4 files changed

+96
-5
lines changed

odbc-api/src/connection.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,10 @@ impl<'c> Connection<'c> {
311311
let Some(cursor) = maybe_cursor else {
312312
return Ok(None);
313313
};
314+
// Deconstrurt the cursor and construct which only borrows the connection and construct a new
315+
// one which takes ownership of the instead.
314316
let stmt_ptr = cursor.into_stmt().into_sys();
315-
// Safety: The connection is the parent of the statement referenced by `stmt_ptr`.
317+
// Safe: The connection is the parent of the statement referenced by `stmt_ptr`.
316318
let stmt = unsafe { StatementConnection::new(stmt_ptr, Arc::clone(&self)) };
317319
// Safe: `stmt` is valid and in cursor state.
318320
let cursor = unsafe { CursorImpl::new(stmt) };

odbc-api/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub use self::{
5252
prepared::Prepared,
5353
result_set_metadata::ResultSetMetadata,
5454
sleep::Sleep,
55-
sync_connection::SharedConnection,
55+
sync_connection::{SharedConnection, shared_connection_into_cursor},
5656
};
5757

5858
/// Reexports `odbc-sys` as sys to enable applications to always use the same version as this

odbc-api/src/sync_connection.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use std::sync::{Arc, Mutex};
22

3-
use crate::{Connection, handles::StatementParent};
3+
use crate::{
4+
Connection, CursorImpl, Error, ParameterCollectionRef,
5+
handles::{StatementConnection, StatementParent},
6+
};
47

58
/// A convinient type alias in case you want to use a connection from multiple threads which share
69
/// ownership of it.
@@ -11,3 +14,53 @@ pub type SharedConnection<'env> = Arc<Mutex<Connection<'env>>>;
1114
/// Connection is guaranteed to be alive and in connected state for the lifetime of
1215
/// [`SharedConnection`].
1316
unsafe impl StatementParent for SharedConnection<'_> {}
17+
18+
/// Similar to [`crate::Connection::into_cursor`], yet it operates on an `Arc<Mutex<Connection>>`.
19+
/// `Arc<Connection>` can be used if you want shared ownership of connections. However,
20+
/// `Arc<Connection>` is not `Send` due to `Connection` not being `Sync`. So sometimes you may want
21+
/// to wrap your `Connection` into an `Arc<Mutex<Connection>>` to allow shared ownership of the
22+
/// connection across threads. This function allows you to create a cursor from such a shared
23+
/// which also holds a strong reference to it.
24+
///
25+
/// # Parameters
26+
///
27+
/// * `query`: The text representation of the SQL statement. E.g. "SELECT * FROM my_table;".
28+
/// * `params`: `?` may be used as a placeholder in the statement text. You can use `()` to
29+
/// represent no parameters. See the [`crate::parameter`] module level documentation for more
30+
/// information on how to pass parameters.
31+
/// * `query_timeout_sec`: Use this to limit the time the query is allowed to take, before
32+
/// responding with data to the application. The driver may replace the number of seconds you
33+
/// provide with a minimum or maximum value.
34+
///
35+
/// For the timeout to work the driver must support this feature. E.g. PostgreSQL, and Microsoft
36+
/// SQL Server do, but SQLite or MariaDB do not.
37+
///
38+
/// You can specify ``0``, to deactivate the timeout, this is the default. So if you want no
39+
/// timeout, just leave it at `None`. Only reason to specify ``0`` is if for some reason your
40+
/// datasource does not have ``0`` as default.
41+
///
42+
/// This corresponds to `SQL_ATTR_QUERY_TIMEOUT` in the ODBC C API.
43+
///
44+
/// See: <https://learn.microsoft.com/en-us/sql/odbc/reference/syntax/sqlsetstmtattr-function>
45+
pub fn shared_connection_into_cursor<'env>(
46+
connection: SharedConnection<'env>,
47+
query: &str,
48+
params: impl ParameterCollectionRef,
49+
query_timeout_sec: Option<usize>,
50+
) -> Result<Option<CursorImpl<StatementConnection<SharedConnection<'env>>>>, Error> {
51+
let guard = connection
52+
.lock()
53+
.expect("Shared connection lock must not be poisned");
54+
let Some(cursor) = guard.execute(query, params, query_timeout_sec)? else {
55+
return Ok(None);
56+
};
57+
// Deconstrurt the cursor and construct which only borrows the connection and construct a new
58+
// one which takes ownership of the instead.
59+
let stmt_ptr = cursor.into_stmt().into_sys();
60+
drop(guard);
61+
// Safe: The connection is the parent of the statement referenced by `stmt_ptr`.
62+
let stmt = unsafe { StatementConnection::new(stmt_ptr, connection) };
63+
// Safe: `stmt` is valid and in cursor state.
64+
let cursor = unsafe { CursorImpl::new(stmt) };
65+
Ok(Some(cursor))
66+
}

odbc-api/tests/integration.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use odbc_api::{
3131
Blob, BlobRead, BlobSlice, InputParameter, VarBinaryArray, VarCharArray, VarCharSlice,
3232
VarCharSliceMut, VarWCharArray, WithDataType,
3333
},
34-
sys,
34+
shared_connection_into_cursor, sys,
3535
};
3636
use widestring::Utf16String;
3737

@@ -42,7 +42,7 @@ use std::{
4242
num::NonZeroUsize,
4343
ptr::null_mut,
4444
str,
45-
sync::Arc,
45+
sync::{Arc, Mutex},
4646
thread,
4747
time::{Duration, Instant},
4848
};
@@ -604,6 +604,42 @@ fn shared_ownership_of_connections_by_statement(profile: &Profile) {
604604
assert_eq!(expected, cursor_to_string(cursor));
605605
}
606606

607+
#[test_case(MSSQL; "Microsoft SQL Server")]
608+
#[test_case(MARIADB; "Maria DB")]
609+
#[test_case(SQLITE_3; "SQLite 3")]
610+
#[test_case(POSTGRES; "PostgreSQL")]
611+
fn share_connections_with_statement_in_other_thread(profile: &Profile) {
612+
// Given
613+
let table_name = table_name!();
614+
let (conn, table) = Given::new(&table_name)
615+
.column_types(&["INT"])
616+
.values_by_column(&[&[Some("42")]])
617+
.build(profile)
618+
.unwrap();
619+
620+
// When
621+
let conn = Arc::new(Mutex::new(conn));
622+
let mut cursor =
623+
shared_connection_into_cursor(conn.clone(), &table.sql_all_ordered_by_id(), (), None)
624+
.unwrap()
625+
.unwrap();
626+
let other_thread = thread::spawn(move || {
627+
let mut i = 0i32;
628+
cursor
629+
.next_row()
630+
.unwrap()
631+
.unwrap()
632+
.get_data(1, &mut i)
633+
.unwrap();
634+
i
635+
});
636+
drop(conn);
637+
let answer = other_thread.join().unwrap();
638+
639+
// Then
640+
assert_eq!(42, answer);
641+
}
642+
607643
/// Strong exception safety for `into_cursor`. Our first query will fail, because it will query a
608644
/// non-existing table, but our second one using the same connection will succeed. This is one
609645
/// scenario in which it is useful not to "swallow" the connection in case of an error.

0 commit comments

Comments
 (0)