You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I post my question here as it's not really an issue with the SQLx transactions.
Context
I'm using an hexagonal architecture:
api/: list of HTTP endpoints. An endpoint instanciate some stores providing a database object and call a use-case.
application/: list of use-cases that contains the business logic, without have any knowledge of the underlying implementation (SQLx, Diesel, ...).
domain/: structures used in the crate and traits to be implemented and used by the use-cases.
infrastructure/: specific implementation of traits.
Requirements
Starting a transaction must be done in the use-case as it's the only one to know when it's useful and avoid calling time consuming functions in a transaction.
I don't want to pollute my trait's functions with SQLx type arguments.
Naive implementation
I've provided a basic implementation of what I described at the end of this message. My problem is that I use Arc<Mutex<...>> to get a mutable access to my Db object and start the transaction.
Moreover, I have another mutex for the transaction to be able to get mutable access to it when calling SQLx queries.
Question
I've check the sqlx::Transaction and it cannot be copied, and the commit() function consumes the object so I don't have other choices but do I am missing something, to avoid locking mutuxes for each requests ?
Code
[package]
name = "tx"version = "0.1.0"edition = "2024"
[dependencies]
axum = { version = "0.8.4" }
derive_more = { version = "0.99.20" }
sqlx = { version = "0.8.4", features = ["postgres"] }
thiserror = { version = "2.0.12" }
tokio = { version = "1.45.1", features = ["full"] }
use axum::extract::Extension;use axum::http::StatusCode;use axum::response::{IntoResponse,Response};use axum::routing::post;use axum::Router;use sqlx::{PgPool,PgTransaction,Pool};use std::sync::Arc;use tokio::sync::Mutex;#[derive(Debug, derive_more::Display, thiserror::Error)]enumError{#[error(transparent)]IO(#[from] std::io::Error),#[error(transparent)]SQLx(#[from] sqlx::Error),UserNotFound,}implIntoResponseforError{fninto_response(self) -> Response{StatusCode::INTERNAL_SERVER_ERROR.into_response()}}#[tokio::main]asyncfnmain() -> Result<(),Error>{let pool:PgPool = Pool::connect("postgres://test:test@localhost:5432/test").await?;let app = Router::new().route("/",post(set_user_access)).layer(Extension(pool));let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await.unwrap();
axum::serve(listener, app).await?;Ok(())}// --------------------------------------------------------// api/ (HTTP endpoints)// --------------------------------------------------------asyncfnset_user_access(Extension(pool):Extension<PgPool>) -> Result<implIntoResponse,Error>{let db = Arc::new(Mutex::new(Db::new(&pool)));// Decide which implementation to be used herelet user_store = SQLxUserStore::new(db.clone());let access_store = SQLxAccessStore::new(db.clone());// The following arguments should be obtain from th request (parameter, body, ...)let user_id = 1;let module = String::new();// Call the logicSetUserAccess::new(user_store, access_store, db).handle((user_id, module)).await?;Ok(StatusCode::OK)}// --------------------------------------------------------// application/ (business logic)// --------------------------------------------------------pubtraitUseCase{typeArgs;typeOutput;typeError;fnhandle(&self,args:Self::Args,) -> impl std::future::Future<Output = Result<Self::Output,Self::Error>>;}structSetUserAccess<A,B>whereA:UserStore,B:AccessStore,{user_store:A,access_store:B,db:SharedDb,}impl<A,B>SetUserAccess<A,B>whereA:UserStore,B:AccessStore,{pubfnnew(user_store:A,access_store:B,db:SharedDb) -> Self{Self{
user_store,
access_store,
db,}}}impl<A,B>UseCaseforSetUserAccess<A,B>whereA:UserStore,B:AccessStore,{typeArgs = (UserId,String);typeOutput = ();typeError = Error;asyncfnhandle(&self,args:Self::Args) -> Result<Self::Output,Self::Error>{let(user_id, module) = args;// Start transaction here: we don't call it in the endpoint as we want to ensure that it's// done only for database requests and not time consuming stuff.// After this line of code, all stores will execute queries using the transaction and not// the pool.self.db.lock().await.start_transaction().await?;if !self.user_store.exists(user_id).await? {returnErr(Error::UserNotFound);}let _access_id = self.access_store.create_access(user_id, module).await?;// We're done querying the databaseself.db.lock().await.commit().await?;// From here we can call time consuming funtions like mail sending or whateverOk(())}}// --------------------------------------------------------// domain/ (structures and interfaces)// --------------------------------------------------------typeAccessId = i32;typeUserId = i32;typeSharedDb = Arc<Mutex<Db>>;structDb{pubpool:PgPool,pubtx:Option<Arc<Mutex<PgTransaction<'static>>>>,}implDb{pubfnnew(pool:&PgPool) -> Self{Self{pool: pool.clone(),tx:None,}}pubasyncfnstart_transaction(&mutself) -> Result<(),Error>{self.tx = Some(Arc::new(Mutex::new(self.pool.begin().await?)));Ok(())}pubasyncfncommit(&mutself) -> Result<(),Error>{ifletSome(tx) = self.tx.take(){ifletSome(tx) = Arc::into_inner(tx){let tx = tx.into_inner();
tx.commit().await?;}}Ok(())}}traitUserStore{asyncfnexists(&self,user_id:UserId) -> Result<bool,Error>;}traitAccessStore{asyncfncreate_access(&self,user_id:UserId,module:String) -> Result<AccessId,Error>;}// --------------------------------------------------------// infrastructure (implementations)// --------------------------------------------------------structSQLxUserStore{db:SharedDb,}implSQLxUserStore{pubfnnew(db:SharedDb) -> Self{Self{ db }}}implUserStoreforSQLxUserStore{asyncfnexists(&self,user_id:UserId) -> Result<bool,Error>{let db = self.db.lock().await;let request = "SELECT 1 AS found FROM users WHERE id = $1 LIMIT 1";let is_found = ifletSome(tx) = &db.tx{letmut tx = tx.lock().await;
sqlx::query_scalar::<_,i32>(request).bind(user_id).fetch_one(&mut**tx).await.is_ok()}else{
sqlx::query_scalar::<_,i32>(request).bind(user_id).fetch_one(&db.pool).await.is_ok()};Ok(is_found)}}structSQLxAccessStore{db:SharedDb,}implSQLxAccessStore{pubfnnew(db:SharedDb) -> Self{Self{ db }}}implAccessStoreforSQLxAccessStore{asyncfncreate_access(&self,user_id:UserId,module:String) -> Result<AccessId,Error>{let id = sqlx::query_scalar("INSERT INTO accesses (user_id, module) VALUES ($1, $2) RETURNING id",).bind(user_id).bind(module).fetch_one(&self.db.lock().await.pool).await?;Ok(id)}}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
Hello,
I post my question here as it's not really an issue with the SQLx transactions.
Context
I'm using an hexagonal architecture:
Requirements
Starting a transaction must be done in the use-case as it's the only one to know when it's useful and avoid calling time consuming functions in a transaction.
I don't want to pollute my trait's functions with SQLx type arguments.
Naive implementation
I've provided a basic implementation of what I described at the end of this message. My problem is that I use Arc<Mutex<...>> to get a mutable access to my
Db
object and start the transaction.Moreover, I have another mutex for the transaction to be able to get mutable access to it when calling SQLx queries.
Question
I've check the sqlx::Transaction and it cannot be copied, and the
commit()
function consumes the object so I don't have other choices but do I am missing something, to avoid locking mutuxes for each requests ?Code
Beta Was this translation helpful? Give feedback.
All reactions