Skip to content

Commit c255e52

Browse files
authored
refactor: Wrap lease_keepalive_abort with defer (#10)
1 parent 7f2b1b3 commit c255e52

File tree

3 files changed

+21
-15
lines changed

3 files changed

+21
-15
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ crate-type = ["cdylib"]
1313
etcd-client = "0.12.4"
1414
pyo3 = { version = "0.20.2", features = ["extension-module", "multiple-pymethods"] }
1515
pyo3-asyncio = { version = "0.20.0", features = ["tokio-runtime"] }
16+
scopeguard = "1.2.0"
1617
tokio = { version = "1.32.0", features = ["sync"] }
1718
tokio-stream = "0.1.14"
1819
tonic = "0.10.2"

src/lock_manager.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,20 @@ impl EtcdLockManager {
8080
.await
8181
.map_err(PyClientError)?;
8282

83-
self.lease_id = match self.ttl {
83+
let mut self_ = scopeguard::guard(self, |self_| {
84+
if let Some(ref lease_keepalive_task) = self_.lease_keepalive_task {
85+
lease_keepalive_task.abort();
86+
}
87+
});
88+
89+
self_.lease_id = match self_.ttl {
8490
Some(ttl) => {
8591
let lease_grant_res = client.lease_grant(ttl, None).await.map_err(PyClientError)?;
8692
let lease_id = lease_grant_res.id();
8793

8894
let mut client_to_move = client.clone();
8995

90-
self.lease_keepalive_task = Some(tokio::spawn(async move {
96+
self_.lease_keepalive_task = Some(tokio::spawn(async move {
9197
let (mut lease_keeper, _lease_stream) = client_to_move
9298
.lease_keep_alive(lease_id)
9399
.await
@@ -105,22 +111,22 @@ impl EtcdLockManager {
105111
};
106112

107113
let timeout_result: Result<Result<(), PyClientError>, tokio::time::error::Elapsed> =
108-
match self.timeout_seconds {
114+
match self_.timeout_seconds {
109115
Some(seconds) => {
110-
timeout(Duration::from_secs_f64(seconds), self.try_lock(&mut client)).await
116+
timeout(
117+
Duration::from_secs_f64(seconds),
118+
self_.try_lock(&mut client),
119+
)
120+
.await
111121
}
112-
None => ready(Ok(self.try_lock(&mut client).await)).await,
122+
None => ready(Ok(self_.try_lock(&mut client).await)).await,
113123
};
114124

115-
if let Some(ref lease_keepalive_task) = self.lease_keepalive_task {
116-
lease_keepalive_task.abort();
117-
}
118-
119125
match timeout_result {
120-
Ok(Ok(_)) => {}
121-
Ok(Err(e)) => return Err(e.into()),
126+
Ok(Ok(_)) => Ok(PyCommunicator::new(client)),
127+
Ok(Err(try_lock_err)) => Err(try_lock_err.into()),
122128
Err(timedout_err) => {
123-
if let Some(lease_id) = self.lease_id {
129+
if let Some(lease_id) = self_.lease_id {
124130
if let Err(etcd_client::Error::GRpcStatus(status)) =
125131
client.lease_revoke(lease_id).await
126132
{
@@ -129,11 +135,9 @@ impl EtcdLockManager {
129135
}
130136
}
131137
}
132-
return Err(LockError::new_err(timedout_err.to_string()));
138+
Err(LockError::new_err(timedout_err.to_string()))
133139
}
134140
}
135-
136-
Ok(PyCommunicator::new(client))
137141
}
138142

139143
pub async fn handle_aexit(&mut self) -> PyResult<()> {

0 commit comments

Comments
 (0)