Replies: 1 comment
-
In case it helps anyone I came up with this, which seems to work (not tested in prod). One concern is how heavy the pool is to clone, does anyone know if that's safe to clone a bunch? #[async_trait]
trait Transactable {
type T: Send;
async fn run(&mut self, txn: &mut PgConnection) -> Result<Self::T, sqlx::Error>;
fn db(&mut self) -> Pool<Postgres>;
async fn execute(&mut self) -> Result<Self::T, sqlx::Error> {
let mut txn = self.db().begin().await?;
sqlx::query("SAVEPOINT cockroach_restart")
.execute(&mut *txn)
.await?;
let mut retries = 0;
loop {
match self.run(&mut *txn).await {
Ok(result) => {
sqlx::query("RELEASE SAVEPOINT cockroach_restart")
.execute(&mut *txn)
.await?;
return Ok(result);
}
Err(err)
if retries <= 5
&& err.as_database_error().map_or(false, |e| {
e.code().map(|s| s.to_string()) == Some("40001".into())
}) =>
{
sqlx::query("ROLLBACK TO SAVEPOINT cockroach_restart")
.execute(&mut *txn)
.await?;
retries += 1;
}
Err(err) => {
sqlx::query("ROLLBACK TO SAVEPOINT cockroach_restart")
.execute(&mut *txn)
.await?;
return Err(err);
}
}
}
}
}
struct SaveThing {
db: Pool<Postgres>,
}
#[async_trait]
impl Transactable for SaveThing {
type T = Thing;
fn db(&mut self) -> Pool<Postgres> {
self.db.clone()
}
async fn run(&mut self, txn: &mut PgConnection) -> Result<Thing, sqlx::Error> {
let thing = sqlx::query_as!(
Thing,
"
update thing set
title = 'new title',
returning *
"
)
.fetch_one(&mut *txn)
.await;
thing
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I'm wondering how I can implement a wrapper/enhance sqlx to allow for generic retries on an executed transaction as described here: https://www.cockroachlabs.com/docs/v23.1/transactions#transaction-retries
I see an example here: https://www.cockroachlabs.com/docs/stable/build-a-rust-app-with-cockroachdb but I'm struggling to translate this sqlx. I tried building my own Executor impl that wraps, and while this all compiles I'm not sure where to inject the retry logic in the function
fetch_many
or as it returns aBoxStream
.I also attempted to build a generic
txn
function that could take adb
and an async block that yielded atx
executor to a closure so multiple statements could be executed with retries but also struggled to build that.Something like this:
Any hints on how to build a generic wrapper around a transaction that can retry on specific return codes from the db?
Beta Was this translation helpful? Give feedback.
All reactions