Skip to content

Commit 1ced39d

Browse files
committed
twmq improvememnts, transaction cancellation
1 parent f8b13bb commit 1ced39d

File tree

19 files changed

+1006
-367
lines changed

19 files changed

+1006
-367
lines changed

executors/src/external_bundler/confirm.rs

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,18 @@ use engine_core::{
88
use serde::{Deserialize, Serialize};
99
use std::{sync::Arc, time::Duration};
1010
use twmq::{
11-
DurableExecution, FailHookData, NackHookData, Queue, SuccessHookData,
11+
DurableExecution, FailHookData, NackHookData, Queue, SuccessHookData, UserCancellable,
1212
error::TwmqError,
1313
hooks::TransactionContext,
14-
job::{Job, JobResult, RequeuePosition, ToJobResult},
14+
job::{BorrowedJob, JobResult, RequeuePosition, ToJobResult},
1515
};
1616

17-
use crate::webhook::{
18-
WebhookJobHandler,
19-
envelope::{ExecutorStage, HasWebhookOptions, WebhookCapable},
17+
use crate::{
18+
webhook::{
19+
WebhookJobHandler,
20+
envelope::{ExecutorStage, HasWebhookOptions, WebhookCapable},
21+
},
22+
transaction_registry::TransactionRegistry,
2023
};
2124

2225
use super::deployment::RedisDeploymentLock;
@@ -66,6 +69,9 @@ pub enum UserOpConfirmationError {
6669

6770
#[error("Internal error: {message}")]
6871
InternalError { message: String },
72+
73+
#[error("Transaction cancelled by user")]
74+
UserCancelled,
6975
}
7076

7177
impl From<TwmqError> for UserOpConfirmationError {
@@ -76,6 +82,12 @@ impl From<TwmqError> for UserOpConfirmationError {
7682
}
7783
}
7884

85+
impl UserCancellable for UserOpConfirmationError {
86+
fn user_cancelled() -> Self {
87+
UserOpConfirmationError::UserCancelled
88+
}
89+
}
90+
7991
// --- Handler ---
8092
pub struct UserOpConfirmationHandler<CS>
8193
where
@@ -84,6 +96,7 @@ where
8496
pub chain_service: Arc<CS>,
8597
pub deployment_lock: RedisDeploymentLock,
8698
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
99+
pub transaction_registry: Arc<TransactionRegistry>,
87100
pub max_confirmation_attempts: u32,
88101
pub confirmation_retry_delay: Duration,
89102
}
@@ -96,11 +109,13 @@ where
96109
chain_service: Arc<CS>,
97110
deployment_lock: RedisDeploymentLock,
98111
webhook_queue: Arc<Queue<WebhookJobHandler>>,
112+
transaction_registry: Arc<TransactionRegistry>,
99113
) -> Self {
100114
Self {
101115
chain_service,
102116
deployment_lock,
103117
webhook_queue,
118+
transaction_registry,
104119
max_confirmation_attempts: 20, // ~5 minutes with 15 second delays
105120
confirmation_retry_delay: Duration::from_secs(15),
106121
}
@@ -121,9 +136,9 @@ where
121136
type ErrorData = UserOpConfirmationError;
122137
type JobData = UserOpConfirmationJobData;
123138

124-
#[tracing::instrument(skip(self, job), fields(transaction_id = job.id, stage = Self::stage_name(), executor = Self::executor_name()))]
125-
async fn process(&self, job: &Job<Self::JobData>) -> JobResult<Self::Output, Self::ErrorData> {
126-
let job_data = &job.data;
139+
#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))]
140+
async fn process(&self, job: &BorrowedJob<Self::JobData>) -> JobResult<Self::Output, Self::ErrorData> {
141+
let job_data = &job.job.data;
127142

128143
// 1. Get Chain
129144
let chain = self
@@ -136,7 +151,7 @@ where
136151
.map_err_fail()?;
137152

138153
let chain = chain.with_new_default_headers(
139-
job.data
154+
job.job.data
140155
.rpc_credentials
141156
.to_header_map()
142157
.map_err(|e| UserOpConfirmationError::InternalError {
@@ -161,17 +176,17 @@ where
161176
Some(receipt) => receipt,
162177
None => {
163178
// Receipt not available and max attempts reached - permanent failure
164-
if job.attempts >= self.max_confirmation_attempts {
179+
if job.job.attempts >= self.max_confirmation_attempts {
165180
return Err(UserOpConfirmationError::ReceiptNotAvailable {
166181
user_op_hash: job_data.user_op_hash.clone(),
167-
attempt_number: job.attempts,
182+
attempt_number: job.job.attempts,
168183
})
169184
.map_err_fail(); // FAIL - triggers on_fail hook which will release lock
170185
}
171186

172187
return Err(UserOpConfirmationError::ReceiptNotAvailable {
173188
user_op_hash: job_data.user_op_hash.clone(),
174-
attempt_number: job.attempts,
189+
attempt_number: job.job.attempts,
175190
})
176191
.map_err_nack(Some(self.confirmation_retry_delay), RequeuePosition::Last);
177192
// NACK - triggers on_nack hook which keeps lock for retry
@@ -197,31 +212,37 @@ where
197212

198213
async fn on_success(
199214
&self,
200-
job: &Job<Self::JobData>,
215+
job: &BorrowedJob<Self::JobData>,
201216
success_data: SuccessHookData<'_, Self::Output>,
202217
tx: &mut TransactionContext<'_>,
203218
) {
219+
// Remove transaction from registry since confirmation is complete
220+
self.transaction_registry.add_remove_command(
221+
tx.pipeline(),
222+
&job.job.data.transaction_id,
223+
);
224+
204225
// Atomic cleanup: release lock + update cache if lock was acquired
205-
if job.data.deployment_lock_acquired {
226+
if job.job.data.deployment_lock_acquired {
206227
self.deployment_lock
207228
.release_lock_and_update_cache_with_pipeline(
208229
tx.pipeline(),
209-
job.data.chain_id,
210-
&job.data.account_address,
230+
job.job.data.chain_id,
231+
&job.job.data.account_address,
211232
true, // is_deployed = true
212233
);
213234

214235
tracing::info!(
215-
transaction_id = %job.data.transaction_id,
216-
account_address = ?job.data.account_address,
236+
transaction_id = %job.job.data.transaction_id,
237+
account_address = ?job.job.data.account_address,
217238
"Added atomic lock release and cache update to transaction pipeline"
218239
);
219240
}
220241

221242
// Queue success webhook
222243
if let Err(e) = self.queue_success_webhook(job, success_data, tx) {
223244
tracing::error!(
224-
transaction_id = %job.data.transaction_id,
245+
transaction_id = %job.job.data.transaction_id,
225246
error = %e,
226247
"Failed to queue success webhook"
227248
);
@@ -230,39 +251,45 @@ where
230251

231252
async fn on_nack(
232253
&self,
233-
job: &Job<Self::JobData>,
254+
job: &BorrowedJob<Self::JobData>,
234255
nack_data: NackHookData<'_, Self::ErrorData>,
235256
tx: &mut TransactionContext<'_>,
236257
) {
237258
// NEVER release lock on NACK - job will be retried with the same lock
238259
// Just queue webhook with current status
239260
if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) {
240261
tracing::error!(
241-
transaction_id = %job.data.transaction_id,
262+
transaction_id = %job.job.data.transaction_id,
242263
error = %e,
243264
"Failed to queue nack webhook"
244265
);
245266
}
246267

247268
tracing::debug!(
248-
transaction_id = %job.data.transaction_id,
249-
attempt = %job.attempts,
269+
transaction_id = %job.job.data.transaction_id,
270+
attempt = %job.job.attempts,
250271
"Confirmation job NACKed, retaining lock for retry"
251272
);
252273
}
253274

254275
async fn on_fail(
255276
&self,
256-
job: &Job<Self::JobData>,
277+
job: &BorrowedJob<Self::JobData>,
257278
fail_data: FailHookData<'_, Self::ErrorData>,
258279
tx: &mut TransactionContext<'_>,
259280
) {
281+
// Remove transaction from registry since it failed permanently
282+
self.transaction_registry.add_remove_command(
283+
tx.pipeline(),
284+
&job.job.data.transaction_id,
285+
);
286+
260287
// Always release lock on permanent failure
261-
if job.data.deployment_lock_acquired {
288+
if job.job.data.deployment_lock_acquired {
262289
self.deployment_lock.release_lock_with_pipeline(
263290
tx.pipeline(),
264-
job.data.chain_id,
265-
&job.data.account_address,
291+
job.job.data.chain_id,
292+
&job.job.data.account_address,
266293
);
267294

268295
let failure_reason = match fail_data.error {
@@ -273,8 +300,8 @@ where
273300
};
274301

275302
tracing::error!(
276-
transaction_id = %job.data.transaction_id,
277-
account_address = ?job.data.account_address,
303+
transaction_id = %job.job.data.transaction_id,
304+
account_address = ?job.job.data.account_address,
278305
reason = %failure_reason,
279306
"Added lock release to transaction pipeline due to permanent failure"
280307
);
@@ -283,7 +310,7 @@ where
283310
// Queue failure webhook
284311
if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) {
285312
tracing::error!(
286-
transaction_id = %job.data.transaction_id,
313+
transaction_id = %job.job.data.transaction_id,
287314
error = %e,
288315
"Failed to queue fail webhook"
289316
);

0 commit comments

Comments
 (0)