-
Notifications
You must be signed in to change notification settings - Fork 1
EOA execution #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis change introduces a robust multilane queue system in the Changes
Sequence Diagram(s)Multilane Job Batch Pop and ProcessingsequenceDiagram
participant Client
participant MultilaneQueue
participant Redis
participant Worker
participant Handler
Client->>MultilaneQueue: push_to_lane(job, lane_id)
MultilaneQueue->>Redis: Store job data and update lane sets
Redis-->>MultilaneQueue: OK
Worker->>MultilaneQueue: pop_batch_jobs(batch_size)
MultilaneQueue->>Redis: Lua script to atomically pop jobs from lanes
Redis-->>MultilaneQueue: List of jobs (by lane)
MultilaneQueue-->>Worker: BorrowedJob(s)
loop For each BorrowedJob
Worker->>Handler: process(job)
Handler-->>Worker: JobResult (Success/Nack/Fail)
Worker->>MultilaneQueue: complete_job(BorrowedJob, JobResult)
MultilaneQueue->>Redis: Atomic update (WATCH/MULTI/EXEC)
Redis-->>MultilaneQueue: OK
end
EOA Executor Worker Transaction LifecyclesequenceDiagram
participant EoaExecutorWorker
participant EoaExecutorStore
participant ChainService
participant EoaSigner
participant Blockchain
EoaExecutorWorker->>EoaExecutorStore: Acquire lock for EOA
EoaExecutorWorker->>ChainService: Get chain for chain_id
EoaExecutorWorker->>EoaExecutorStore: Crash recovery (rebroadcast borrowed txs)
EoaExecutorWorker->>ChainService: Send transaction (if needed)
ChainService->>Blockchain: Broadcast transaction
Blockchain-->>ChainService: Tx hash / error
ChainService-->>EoaExecutorWorker: Result
EoaExecutorWorker->>EoaExecutorStore: Confirm on-chain txs
EoaExecutorWorker->>ChainService: Get on-chain nonce
ChainService-->>EoaExecutorWorker: Nonce
EoaExecutorWorker->>EoaExecutorStore: Update state, mark confirmed/failed
EoaExecutorWorker->>EoaExecutorStore: Send new or recycled txs
EoaExecutorWorker->>EoaSigner: Sign transaction
EoaSigner-->>EoaExecutorWorker: Signature
EoaExecutorWorker->>ChainService: Send signed transaction
ChainService-->>EoaExecutorWorker: Result
EoaExecutorWorker->>EoaExecutorStore: Update transaction state
Possibly related PRs
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 Clippy (1.86.0)
error: failed to get Caused by: Caused by: Caused by: Caused by: ✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
🔭 Outside diff range comments (1)
aa-core/src/userop/builder.rs (1)
316-318
: Potential panic viaunwrap_or_default()
unwrap_or_default()
silently masks missingpaymaster_post_op_gas_limit
by inserting0
. If that value is required by the protocol, defaulting to zero could cause on-chain reverts. Consider propagating an error instead.- bundler_response - .paymaster_post_op_gas_limit - .unwrap_or_default(), + bundler_response + .paymaster_post_op_gas_limit + .ok_or(EngineError::MissingBundlerGasLimit)?,Replace with an appropriate domain error.
♻️ Duplicate comments (1)
core/Cargo.toml (1)
13-14
: Same branch-pinning concern asaa-core
See previous comment—use an immutable
rev
for bothvault-*
dependencies to avoid surprise breakages.
🧹 Nitpick comments (6)
aa-core/src/userop/builder.rs (2)
238-246
: Missing error context on gas-price fetch failureIf
paymaster_client().get_user_op_gas_fees()
fails, the propagated error loses context about which entrypoint version (v0.7) failed. Consider adding contextual info:- .map_err(|e| e.to_engine_error(self.chain))?; + .map_err(|e| { + tracing::error!("v0.7 gas-fee fetch failed: {e}"); + e.to_engine_error(self.chain) + })?;This small addition greatly eases troubleshooting in prod.
260-266
: UnnecessarySome()
wrapping
paymaster
andpaymaster_data
are alreadyOption
s—pm_response
returns concrete values. Wrapping again withSome()
is redundant when fields are mutable:- self.userop.paymaster = Some(pm_response.paymaster); - self.userop.paymaster_data = Some(pm_response.paymaster_data); + self.userop.paymaster = pm_response.paymaster; + self.userop.paymaster_data = pm_response.paymaster_data;Assuming the struct definitions use
Option<Bytes>
this simplifies the code path.executors/src/eoa/error_classifier.rs (1)
265-314
: Add more comprehensive test coverage.The current tests cover basic scenarios but miss edge cases and recovery strategy variations.
Consider adding tests for:
- Empty error messages
- Error messages with special characters
- Case sensitivity edge cases
- All error variants
- Recovery strategy validation for each error type
Example additional test:
#[test] fn test_edge_cases() { // Empty message let error = EoaErrorMapper::map_ethereum_error(-32000, ""); match error { EoaExecutionError::RpcError { .. } => {} _ => panic!("Expected RpcError for empty message"), } // Mixed case let error = EoaErrorMapper::map_ethereum_error(-32000, "NoNcE tOo LoW"); match error { EoaExecutionError::NonceTooLow { .. } => {} _ => panic!("Expected NonceTooLow for mixed case"), } } #[test] fn test_all_recovery_strategies() { // Test each error variant has appropriate strategy let test_cases = vec![ (EoaExecutionError::NonceTooLow { message: "test".into() }, true, false), (EoaExecutionError::InsufficientFunds { message: "test".into() }, false, true), // ... other variants ]; for (error, expected_queue_confirmation, expected_recycle) in test_cases { let strategy = EoaErrorMapper::get_recovery_strategy(&error); assert_eq!(strategy.queue_confirmation, expected_queue_confirmation); assert_eq!(strategy.recycle_nonce, expected_recycle); } }executors/src/eoa/nonce_manager.rs (1)
412-436
: Simplify return type for get_nonce_assignments.This method returns a
Vec
but only ever contains zero or one element, which is misleading.Change the return type to
Option<NonceAssignment>
:-pub async fn get_nonce_assignments( +pub async fn get_nonce_assignment( &self, eoa: Address, chain_id: u64, nonce: U256, -) -> Result<Vec<NonceAssignment>, NonceManagerError> { +) -> Result<Option<NonceAssignment>, NonceManagerError> { let mut conn = self.redis.clone(); let assignments_key = self.nonce_assignments_key(eoa, chain_id); let assignment_json: Option<String> = conn.hget(&assignments_key, nonce.to_string()).await?; match assignment_json { Some(json) => { let assignment: NonceAssignment = serde_json::from_str(&json).map_err(|e| { NonceManagerError::NonceAssignmentFailed { reason: format!("Failed to deserialize assignment: {}", e), } })?; - Ok(vec![assignment]) + Ok(Some(assignment)) } - None => Ok(vec![]), + None => Ok(None), } }executors/src/eoa/send.rs (1)
341-341
: Remove unused pre_computed_hash variable.The pre-computed hash is calculated but never used.
-let pre_computed_hash = signed_tx.hash(); - // 9. Send transaction match chain.provider().send_tx_envelope(signed_tx.into()).await {If you need to validate the hash matches, compare it with the returned hash:
let pre_computed_hash = signed_tx.hash(); // ... send transaction ... assert_eq!(pre_computed_hash, tx_hash, "Hash mismatch");twmq/src/multilane.rs (1)
1198-1235
: Add retry limits and backoff to WATCH loop.The infinite retry loop could cause issues under high contention.
Add retry limits and exponential backoff:
// Execute with lease protection (same pattern as single-lane queue) let lease_key = self.lease_key_name(&job.job.id, &job.lease_token); +let mut retry_count = 0; +const MAX_RETRIES: u32 = 10; loop { + if retry_count >= MAX_RETRIES { + return Err(TwmqError::Runtime { + message: format!("Failed to complete job after {} retries", MAX_RETRIES), + }); + } + let mut conn = self.redis.clone(); redis::cmd("WATCH") .arg(&lease_key) .query_async::<()>(&mut conn) .await?; // ... existing logic ... Err(_) => { tracing::debug!(job_id = %job.job.id, "WATCH failed during completion, retrying"); + retry_count += 1; + if retry_count > 1 { + // Exponential backoff: 10ms, 20ms, 40ms, etc. + tokio::time::sleep(Duration::from_millis(10 * (1 << (retry_count - 1)))).await; + } continue; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (19)
.github/workflows/ci-twmq.yaml
(1 hunks).github/workflows/coverage-twmq.yaml
(1 hunks)Cargo.toml
(1 hunks)aa-core/Cargo.toml
(1 hunks)aa-core/src/userop/builder.rs
(1 hunks)core/Cargo.toml
(1 hunks)core/src/signer.rs
(6 hunks)executors/Cargo.toml
(1 hunks)executors/src/eoa/eoa_confirmation_worker.rs
(1 hunks)executors/src/eoa/error_classifier.rs
(1 hunks)executors/src/eoa/mod.rs
(1 hunks)executors/src/eoa/nonce_manager.rs
(1 hunks)executors/src/eoa/send.rs
(1 hunks)executors/src/eoa/transaction_store.rs
(1 hunks)executors/src/lib.rs
(1 hunks)twmq/src/lib.rs
(3 hunks)twmq/src/multilane.rs
(1 hunks)twmq/src/shutdown.rs
(4 hunks)twmq/tests/multilane_batch_pop.rs
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
twmq/src/lib.rs (1)
twmq/src/multilane.rs (1)
work
(443-540)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: test
- GitHub Check: coverage
🔇 Additional comments (19)
.github/workflows/coverage-twmq.yaml (1)
41-43
: Confirmed: install-action@9185c192a96ba09167ad8663015b3fbbf007ec79 matches v2.56.2
Rangit ls-remote https://github.com/taiki-e/install-action.git refs/tags/v2.56.2
and the SHA aligns with the pinned hash..github/workflows/ci-twmq.yaml (1)
41-43
: Consistency win—same action hash as coverage workflowThe CI workflow now uses the identical commit hash as the coverage job, eliminating drift.
No further issues spotted.Cargo.toml (1)
2-10
: Array re-formatting looks goodPurely stylistic change; no functional impact. TOML 1.0 allows the trailing comma, so this remains valid.
executors/Cargo.toml (1)
22-22
: Chrono version 0.4.41 is valid on crates.io — no action neededThe crates.io API shows
chrono
’smax_version
as0.4.41
, so the dependency declaration is correct and can be merged as-is.executors/src/lib.rs (1)
1-4
: LGTM! Clean module organization.The addition of the
eoa
module and reordering of modules improves code organization by grouping related functionality.twmq/src/lib.rs (3)
4-4
: Module addition looks good.The new
multilane
module integrates well with the existing structure.
18-18
: Clean re-export of multilane types.Re-exporting
MultilaneQueue
andMultilanePushableJob
from the crate root follows good API design practices.
452-452
: Return type update aligns with the generalized shutdown handling.The change from
WorkerHandle<H>
toWorkerHandle<Queue<H>>
correctly reflects the new genericWorkerHandle<Q>
design that supports multiple queue types.core/src/signer.rs (3)
2-2
: Import addition is appropriate.The
TypedTransaction
import is required for the newsign_transaction
method signature.
181-187
: Well-designed trait method addition.The new
sign_transaction
method maintains consistency with existing trait methods and properly handles async operations.
309-344
: Solid implementation of transaction signing.The implementation correctly handles both Vault and IAW credential types with appropriate error handling and logging, maintaining consistency with other signing methods.
executors/src/eoa/mod.rs (1)
1-16
: Well-structured module organization for EOA functionality.The module cleanly organizes EOA-related functionality into logical submodules and provides a convenient public API through re-exports. The separation of concerns (confirmation, error classification, nonce management, sending, and storage) follows good architectural principles.
twmq/src/shutdown.rs (3)
5-9
: Good generalization of WorkerHandle.Making
WorkerHandle
generic overQ
instead of being tied toDurableExecution
improves flexibility and enables support for different queue types.
11-26
: Clean abstraction with the QueueIdentifier trait.The trait provides a minimal interface for queue identification, and the implementations correctly map to the appropriate methods (
name()
for Queue,queue_id()
for MultilaneQueue).
88-91
: Flexible generic signatures for worker management methods.The updated method signatures properly support any queue type
Q
, providing good flexibility without unnecessary trait bounds at the method level.Also applies to: 152-156, 159-167, 169-172
executors/src/eoa/transaction_store.rs (1)
46-51
: LGTM!The
queue_job_id
method provides a clean way to generate unique job IDs for transaction attempts.executors/src/eoa/eoa_confirmation_worker.rs (3)
94-127
: LGTM!Clean implementation with proper dependency injection and trait implementations.
137-292
: Well-structured confirmation monitoring logic.The process method implements a comprehensive flow for monitoring transaction confirmations with proper error handling and retry logic. The nonce progression detection and active transaction monitoring approach is sound.
403-403
: Verifyreceipt.status()
behavior in the alloy libraryIt looks like
receipt.status()
is coming from the external alloy crate (no local implementation in our codebase). Please confirm whether this method returns a plainbool
(guaranteed) or anOption<bool>
(which could beNone
for pending receipts). If it can beNone
, consider guarding against it:• File:
executors/src/eoa/eoa_confirmation_worker.rs
Line: ~403- if receipt.status() { + if receipt.status().unwrap_or(false) {This change ensures that any
None
(e.g., pending) status is treated asfalse
, avoiding a potential panic or unintended behavior.
vault-types = { version = "0.1.0", git = "ssh://git@github.com/thirdweb-dev/vault.git", branch = "pb/update-alloy" } | ||
vault-sdk = { version = "0.1.0", git = "ssh://git@github.com/thirdweb-dev/vault.git", branch = "pb/update-alloy" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Pin commit hash instead of using a moving branch
Depending on the pb/update-alloy
branch for vault-*
crates introduces non-determinism—new commits to that branch may break reproducible builds. Prefer a specific commit or tag:
-vault-types = { version = "0.1.0", git = "...vault.git", branch = "pb/update-alloy" }
+vault-types = { git = "...vault.git", rev = "<commit-sha>" }
Same for vault-sdk
.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
vault-types = { version = "0.1.0", git = "ssh://git@github.com/thirdweb-dev/vault.git", branch = "pb/update-alloy" } | |
vault-sdk = { version = "0.1.0", git = "ssh://git@github.com/thirdweb-dev/vault.git", branch = "pb/update-alloy" } | |
vault-types = { git = "ssh://git@github.com/thirdweb-dev/vault.git", rev = "<commit-sha>" } | |
vault-sdk = { git = "ssh://git@github.com/thirdweb-dev/vault.git", rev = "<commit-sha>" } |
🤖 Prompt for AI Agents
In aa-core/Cargo.toml at lines 11 to 12, the dependencies for vault-types and
vault-sdk use a moving branch reference "pb/update-alloy," which causes
non-deterministic builds. Replace the branch specification with a fixed commit
hash or a specific tag for both dependencies to ensure reproducible builds.
Update the git dependency entries to include the exact commit hash instead of
the branch name.
EoaExecutionError::AlreadyKnown { .. } => RecoveryStrategy { | ||
queue_confirmation: true, | ||
recycle_nonce: false, | ||
needs_resync: false, | ||
retryable: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling for JSON operations.
The current implementation silently ignores JSON deserialization errors and could panic on serialization errors.
Apply this diff to properly handle JSON errors:
-match health_json {
- Some(json) => Ok(serde_json::from_str(&json).unwrap_or_default()),
- None => Ok(EoaHealth::default()),
-}
+match health_json {
+ Some(json) => serde_json::from_str(&json)
+ .map_err(|e| NonceManagerError::NonceAssignmentFailed {
+ reason: format!("Failed to deserialize health data: {}", e),
+ }),
+ None => Ok(EoaHealth::default()),
+}
And for serialization:
-let health_json = serde_json::to_string(health).unwrap();
+let health_json = serde_json::to_string(health)
+ .map_err(|e| NonceManagerError::NonceAssignmentFailed {
+ reason: format!("Failed to serialize health data: {}", e),
+ })?;
Also applies to: 177-177
🤖 Prompt for AI Agents
In executors/src/eoa/error_classifier.rs at lines 159 to 163 and line 177, the
current code silently ignores JSON deserialization errors and may panic on
serialization errors. Modify the error handling logic to explicitly catch and
handle JSON deserialization errors by returning appropriate error results or
recovery strategies instead of ignoring them. Similarly, wrap serialization
operations in error handling to prevent panics, returning errors or fallback
values as needed. Ensure all JSON operations have proper error handling to avoid
silent failures or panics.
match code { | ||
-32000 => { | ||
// Only handle the specific ethereum errors we care about | ||
if msg_lower.contains("nonce too low") { | ||
EoaExecutionError::NonceTooLow { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("nonce too high") { | ||
EoaExecutionError::NonceTooHigh { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("already known") || msg_lower.contains("duplicate") { | ||
EoaExecutionError::AlreadyKnown { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("replacement") && msg_lower.contains("underpriced") { | ||
EoaExecutionError::ReplacementUnderpriced { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("insufficient funds") { | ||
EoaExecutionError::InsufficientFunds { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("gas") { | ||
EoaExecutionError::GasError { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("txpool") || msg_lower.contains("pool limit") { | ||
EoaExecutionError::PoolLimitExceeded { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("account") { | ||
EoaExecutionError::AccountError { | ||
message: message.to_string(), | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider more robust error message matching patterns.
The current string matching approach has potential issues:
- Pattern "gas" (line 106) is too generic and could match unrelated errors
- Pattern "account" (line 114) could also be too broad
- No documentation about which RPC implementations these patterns target
Consider using more specific patterns and documenting the expected error formats:
match code {
-32000 => {
// Only handle the specific ethereum errors we care about
+ // Error patterns based on go-ethereum, erigon, and common RPC implementations
if msg_lower.contains("nonce too low") {
EoaExecutionError::NonceTooLow {
message: message.to_string(),
}
// ... other conditions ...
- } else if msg_lower.contains("gas") {
+ } else if msg_lower.contains("gas limit") || msg_lower.contains("gas price")
+ || msg_lower.contains("intrinsic gas") || msg_lower.contains("out of gas") {
EoaExecutionError::GasError {
message: message.to_string(),
}
// ... other conditions ...
- } else if msg_lower.contains("account") {
+ } else if msg_lower.contains("account not found") || msg_lower.contains("unknown account")
+ || msg_lower.contains("invalid account") {
EoaExecutionError::AccountError {
message: message.to_string(),
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
match code { | |
-32000 => { | |
// Only handle the specific ethereum errors we care about | |
if msg_lower.contains("nonce too low") { | |
EoaExecutionError::NonceTooLow { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("nonce too high") { | |
EoaExecutionError::NonceTooHigh { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("already known") || msg_lower.contains("duplicate") { | |
EoaExecutionError::AlreadyKnown { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("replacement") && msg_lower.contains("underpriced") { | |
EoaExecutionError::ReplacementUnderpriced { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("insufficient funds") { | |
EoaExecutionError::InsufficientFunds { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("gas") { | |
EoaExecutionError::GasError { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("txpool") || msg_lower.contains("pool limit") { | |
EoaExecutionError::PoolLimitExceeded { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("account") { | |
EoaExecutionError::AccountError { | |
message: message.to_string(), | |
} | |
} else { | |
match code { | |
-32000 => { | |
// Only handle the specific ethereum errors we care about | |
// Error patterns based on go-ethereum, erigon, and common RPC implementations | |
if msg_lower.contains("nonce too low") { | |
EoaExecutionError::NonceTooLow { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("nonce too high") { | |
EoaExecutionError::NonceTooHigh { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("already known") || msg_lower.contains("duplicate") { | |
EoaExecutionError::AlreadyKnown { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("replacement") && msg_lower.contains("underpriced") { | |
EoaExecutionError::ReplacementUnderpriced { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("insufficient funds") { | |
EoaExecutionError::InsufficientFunds { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("gas limit") || msg_lower.contains("gas price") | |
|| msg_lower.contains("intrinsic gas") || msg_lower.contains("out of gas") { | |
EoaExecutionError::GasError { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("txpool") || msg_lower.contains("pool limit") { | |
EoaExecutionError::PoolLimitExceeded { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("account not found") || msg_lower.contains("unknown account") | |
|| msg_lower.contains("invalid account") { | |
EoaExecutionError::AccountError { | |
message: message.to_string(), | |
} | |
} else { | |
// ... | |
} | |
} |
🤖 Prompt for AI Agents
In executors/src/eoa/error_classifier.rs between lines 83 and 118, the current
error message matching uses overly broad substrings like "gas" and "account"
which can cause false positives. Refine these patterns to be more specific and
less likely to match unrelated errors, for example by including additional
context words or exact phrases. Also, add comments documenting which RPC error
message formats these patterns are intended to match to improve maintainability
and clarity.
impl Drop for MultilaneTestHarness { | ||
fn drop(&mut self) { | ||
// Cleanup in background since we can't await in drop | ||
let queue_id = self.queue_id.clone(); | ||
let redis = self.queue.clone().redis.clone(); | ||
|
||
tokio::spawn(async move { | ||
let mut conn = redis; | ||
let keys_pattern = format!("twmq_multilane:{}:*", queue_id); | ||
let keys: Vec<String> = redis::cmd("KEYS") | ||
.arg(&keys_pattern) | ||
.query_async(&mut conn) | ||
.await | ||
.unwrap_or_default(); | ||
|
||
if !keys.is_empty() { | ||
redis::cmd("DEL") | ||
.arg(keys) | ||
.query_async::<()>(&mut conn) | ||
.await | ||
.unwrap_or_default(); | ||
} | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve cleanup reliability in Drop implementation.
Spawning async cleanup in Drop has issues:
- Tasks might outlive the test runtime
- No guarantee cleanup completes before next test
- Could cause test interference
Consider explicit cleanup in tests or using a test fixture that ensures cleanup:
impl Drop for MultilaneTestHarness {
fn drop(&mut self) {
- // Cleanup in background since we can't await in drop
- let queue_id = self.queue_id.clone();
- let redis = self.queue.clone().redis.clone();
-
- tokio::spawn(async move {
- let mut conn = redis;
- let keys_pattern = format!("twmq_multilane:{}:*", queue_id);
- let keys: Vec<String> = redis::cmd("KEYS")
- .arg(&keys_pattern)
- .query_async(&mut conn)
- .await
- .unwrap_or_default();
-
- if !keys.is_empty() {
- redis::cmd("DEL")
- .arg(keys)
- .query_async::<()>(&mut conn)
- .await
- .unwrap_or_default();
- }
- });
+ // Log warning if cleanup wasn't called
+ if !self.cleaned_up {
+ eprintln!("Warning: MultilaneTestHarness dropped without cleanup for queue: {}", self.queue_id);
+ }
}
}
Then ensure each test calls cleanup:
#[tokio::test]
async fn test_example() {
let harness = MultilaneTestHarness::new().await;
// ... test logic ...
harness.cleanup().await;
}
🤖 Prompt for AI Agents
In twmq/tests/multilane_batch_pop.rs around lines 160 to 184, the Drop
implementation spawns an async cleanup task that may outlive the test runtime
and cause interference. Remove the async cleanup from Drop and instead implement
an explicit async cleanup method on MultilaneTestHarness. Update tests to call
this cleanup method at the end to ensure reliable and complete resource cleanup
before test exit.
while jobs_popped < batch_size and empty_lanes_count < total_lanes do | ||
local lane_id = lanes_with_scores[lane_index * 2 - 1] | ||
-- Skip if we've already marked this lane as empty | ||
if lane_id == nil then | ||
lane_index = lane_index + 1 | ||
if lane_index > total_lanes then | ||
lane_index = 1 | ||
end | ||
else | ||
local last_score = tonumber(lanes_with_scores[lane_index * 2]) | ||
-- Only cleanup if not visited this batch (score != now) | ||
if last_score ~= now then | ||
cleanup_lane_leases(lane_id) | ||
process_delayed_jobs(lane_id) | ||
redis.call('ZADD', lanes_zset_name, now, lane_id) | ||
lanes_with_scores[lane_index * 2] = tostring(now) | ||
end | ||
-- Try to pop a job from this lane | ||
local job_result = pop_job_from_lane(lane_id) | ||
if job_result then | ||
table.insert(result_jobs, {lane_id, job_result[1], job_result[2], job_result[3], job_result[4], job_result[5], job_result[6]}) | ||
jobs_popped = jobs_popped + 1 | ||
else | ||
-- Lane is empty, mark it and count it | ||
lanes_with_scores[lane_index * 2 - 1] = nil | ||
lanes_with_scores[lane_index * 2] = nil | ||
empty_lanes_count = empty_lanes_count + 1 | ||
-- Check if lane should be removed from Redis | ||
local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' | ||
local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' | ||
local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' | ||
local pending_count = redis.call('LLEN', lane_pending_list) | ||
local delayed_count = redis.call('ZCARD', lane_delayed_zset) | ||
local active_count = redis.call('HLEN', lane_active_hash) | ||
if pending_count == 0 and delayed_count == 0 and active_count == 0 then | ||
redis.call('ZREM', lanes_zset_name, lane_id) | ||
end | ||
end | ||
-- Move to next lane | ||
lane_index = lane_index + 1 | ||
if lane_index > total_lanes then | ||
lane_index = 1 | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve lane tracking to avoid sparse arrays in Lua script.
Setting array elements to nil
creates sparse arrays which can cause iteration issues in Lua.
Track empty lanes separately instead of modifying the array:
local empty_lanes = {} -- Track empty lanes
-- In the loop, instead of:
-- lanes_with_scores[lane_index * 2 - 1] = nil
-- lanes_with_scores[lane_index * 2] = nil
-- Do:
empty_lanes[lane_id] = true
-- And check:
if empty_lanes[lane_id] then
-- Skip this lane
else
-- Process lane
end
This avoids sparse array issues and makes the logic clearer.
🤖 Prompt for AI Agents
In twmq/src/multilane.rs around lines 690 to 742, the current code sets elements
of the lanes_with_scores array to nil to mark empty lanes, which creates sparse
arrays and can cause iteration problems in Lua. Instead, introduce a separate
table (e.g., empty_lanes) to track empty lanes by their lane_id. Replace the nil
assignments with adding lane_id to empty_lanes, and modify the lane skipping
logic to check if a lane_id is in empty_lanes before processing. This change
avoids sparse arrays and clarifies lane tracking.
pub async fn store_transaction_data( | ||
&self, | ||
transaction_data: &TransactionData, | ||
) -> Result<(), TransactionStoreError> { | ||
let mut conn = self.redis.clone(); | ||
let data_key = self.transaction_data_key(&transaction_data.transaction_id); | ||
let active_key = | ||
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id); | ||
|
||
let data_json = serde_json::to_string(transaction_data)?; | ||
|
||
// Store transaction data | ||
let _: () = conn.set(&data_key, data_json).await?; | ||
|
||
// Add to active transactions set | ||
let _: () = conn | ||
.sadd(&active_key, &transaction_data.transaction_id) | ||
.await?; | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider using Redis transactions for atomic operations.
The current implementation performs multiple Redis commands sequentially, which could lead to race conditions if multiple workers operate on the same transaction simultaneously.
Consider using Redis transactions (MULTI/EXEC) or Lua scripts for atomicity:
pub async fn store_transaction_data(
&self,
transaction_data: &TransactionData,
) -> Result<(), TransactionStoreError> {
- let mut conn = self.redis.clone();
+ let mut conn = self.redis.clone();
let data_key = self.transaction_data_key(&transaction_data.transaction_id);
let active_key =
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id);
let data_json = serde_json::to_string(transaction_data)?;
- // Store transaction data
- let _: () = conn.set(&data_key, data_json).await?;
-
- // Add to active transactions set
- let _: () = conn
- .sadd(&active_key, &transaction_data.transaction_id)
- .await?;
+ // Use pipeline for atomic operations
+ let mut pipe = twmq::redis::pipe();
+ pipe.atomic()
+ .set(&data_key, data_json)
+ .sadd(&active_key, &transaction_data.transaction_id);
+ let _: () = pipe.query_async(&mut conn).await?;
Ok(())
}
Alternatively, expose only the pipeline methods and make the async methods private to enforce atomic usage.
Also applies to: 256-293
🤖 Prompt for AI Agents
In executors/src/eoa/transaction_store.rs around lines 116 to 136, the current
method performs multiple Redis commands sequentially, risking race conditions.
Refactor the code to use Redis transactions by wrapping the set and sadd
commands within a MULTI/EXEC block or use a Lua script to ensure atomicity. This
change will guarantee that both operations succeed or fail together, preventing
inconsistent state. Also, consider making async methods private and exposing
only pipeline methods to enforce atomic usage. Apply similar changes to lines
256-293 as well.
let active_key = | ||
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id); | ||
|
||
let data_json = serde_json::to_string(transaction_data).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace unwrap()
with proper error handling in pipeline methods.
Using unwrap()
on serialization can cause panics if serialization fails. While unlikely with these simple structs, it's better to handle errors gracefully in production code.
Consider propagating the error by making these methods return Result
:
- pub fn add_store_transaction_command(
+ pub fn add_store_transaction_command(
&self,
pipeline: &mut Pipeline,
transaction_data: &TransactionData,
- ) {
+ ) -> Result<(), TransactionStoreError> {
let data_key = self.transaction_data_key(&transaction_data.transaction_id);
let active_key =
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id);
- let data_json = serde_json::to_string(transaction_data).unwrap();
+ let data_json = serde_json::to_string(transaction_data)?;
pipeline.set(&data_key, data_json);
pipeline.sadd(&active_key, &transaction_data.transaction_id);
+ Ok(())
}
Apply similar changes to the other pipeline methods.
Also applies to: 385-385, 421-421
🤖 Prompt for AI Agents
In executors/src/eoa/transaction_store.rs at line 368, replace the use of
unwrap() on serde_json::to_string(transaction_data) with proper error handling
by changing the method to return a Result type and propagating the serialization
error using the ? operator. Apply the same pattern to lines 385 and 421,
modifying those methods to return Result and handle errors gracefully instead of
panicking.
/// Mark transaction as failed and clean up | ||
pub async fn mark_transaction_failed( | ||
&self, | ||
transaction_id: &str, | ||
error_message: &str, | ||
) -> Result<(), TransactionStoreError> { | ||
let mut conn = self.redis.clone(); | ||
|
||
// Get transaction data to determine EOA and chain_id | ||
let tx_data = self | ||
.get_transaction_data(transaction_id) | ||
.await? | ||
.ok_or_else(|| TransactionStoreError::TransactionNotFound { | ||
transaction_id: transaction_id.to_string(), | ||
})?; | ||
|
||
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id); | ||
let attempt_key = self.active_attempt_key(transaction_id); | ||
|
||
// Get current attempt to clean up nonce index | ||
if let Some(attempt) = self.get_active_attempt(transaction_id).await? { | ||
let nonce_key = | ||
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce); | ||
let _: () = conn.srem(&nonce_key, transaction_id).await?; | ||
} | ||
|
||
// Remove from active set | ||
let _: () = conn.srem(&active_key, transaction_id).await?; | ||
|
||
// Remove active attempt | ||
let _: () = conn.del(&attempt_key).await?; | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Store the error message when marking transaction as failed.
The error_message
parameter is currently unused. Consider storing it for debugging and audit purposes.
You could store the error in a dedicated key or as part of a failure record:
pub async fn mark_transaction_failed(
&self,
transaction_id: &str,
error_message: &str,
) -> Result<(), TransactionStoreError> {
let mut conn = self.redis.clone();
+ let failure_key = match &self.namespace {
+ Some(ns) => format!("{}:eoa_failure:{}", ns, transaction_id),
+ None => format!("eoa_failure:{}", transaction_id),
+ };
// Get transaction data to determine EOA and chain_id
let tx_data = self
.get_transaction_data(transaction_id)
.await?
.ok_or_else(|| TransactionStoreError::TransactionNotFound {
transaction_id: transaction_id.to_string(),
})?;
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id);
let attempt_key = self.active_attempt_key(transaction_id);
// Get current attempt to clean up nonce index
if let Some(attempt) = self.get_active_attempt(transaction_id).await? {
let nonce_key =
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce);
let _: () = conn.srem(&nonce_key, transaction_id).await?;
}
+ // Store failure reason with timestamp
+ let failure_data = serde_json::json!({
+ "error": error_message,
+ "failed_at": std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs(),
+ });
+ let _: () = conn.set(&failure_key, failure_data.to_string()).await?;
// Remove from active set
let _: () = conn.srem(&active_key, transaction_id).await?;
// Remove active attempt
let _: () = conn.del(&attempt_key).await?;
Ok(())
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
/// Mark transaction as failed and clean up | |
pub async fn mark_transaction_failed( | |
&self, | |
transaction_id: &str, | |
error_message: &str, | |
) -> Result<(), TransactionStoreError> { | |
let mut conn = self.redis.clone(); | |
// Get transaction data to determine EOA and chain_id | |
let tx_data = self | |
.get_transaction_data(transaction_id) | |
.await? | |
.ok_or_else(|| TransactionStoreError::TransactionNotFound { | |
transaction_id: transaction_id.to_string(), | |
})?; | |
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id); | |
let attempt_key = self.active_attempt_key(transaction_id); | |
// Get current attempt to clean up nonce index | |
if let Some(attempt) = self.get_active_attempt(transaction_id).await? { | |
let nonce_key = | |
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce); | |
let _: () = conn.srem(&nonce_key, transaction_id).await?; | |
} | |
// Remove from active set | |
let _: () = conn.srem(&active_key, transaction_id).await?; | |
// Remove active attempt | |
let _: () = conn.del(&attempt_key).await?; | |
Ok(()) | |
} | |
/// Mark transaction as failed and clean up | |
pub async fn mark_transaction_failed( | |
&self, | |
transaction_id: &str, | |
error_message: &str, | |
) -> Result<(), TransactionStoreError> { | |
let mut conn = self.redis.clone(); | |
let failure_key = match &self.namespace { | |
Some(ns) => format!("{}:eoa_failure:{}", ns, transaction_id), | |
None => format!("eoa_failure:{}", transaction_id), | |
}; | |
// Get transaction data to determine EOA and chain_id | |
let tx_data = self | |
.get_transaction_data(transaction_id) | |
.await? | |
.ok_or_else(|| TransactionStoreError::TransactionNotFound { | |
transaction_id: transaction_id.to_string(), | |
})?; | |
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id); | |
let attempt_key = self.active_attempt_key(transaction_id); | |
// Get current attempt to clean up nonce index | |
if let Some(attempt) = self.get_active_attempt(transaction_id).await? { | |
let nonce_key = | |
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce); | |
let _: () = conn.srem(&nonce_key, transaction_id).await?; | |
} | |
// Store failure reason with timestamp | |
let failure_data = serde_json::json!({ | |
"error": error_message, | |
"failed_at": std::time::SystemTime::now() | |
.duration_since(std::time::UNIX_EPOCH) | |
.unwrap() | |
.as_secs(), | |
}); | |
let _: () = conn.set(&failure_key, failure_data.to_string()).await?; | |
// Remove from active set | |
let _: () = conn.srem(&active_key, transaction_id).await?; | |
// Remove active attempt | |
let _: () = conn.del(&attempt_key).await?; | |
Ok(()) | |
} |
🤖 Prompt for AI Agents
In executors/src/eoa/transaction_store.rs around lines 295 to 328, the
error_message parameter in mark_transaction_failed is unused. To fix this, store
the error_message in Redis or your storage system as part of the transaction
failure record, for example by setting a dedicated key like
"transaction:{transaction_id}:error" with the error_message value. This will
help with debugging and auditing failed transactions.
let requeue_job = self | ||
.send_queue | ||
.clone() | ||
.job(EoaSendJobData { | ||
transaction_id: tx_data.transaction_id.clone(), | ||
chain_id: tx_data.chain_id, | ||
from: tx_data.eoa, | ||
to: tx_data.to, | ||
value: tx_data.value, | ||
data: tx_data.data.into(), | ||
webhook_options: None, // TODO: Get from original job if needed | ||
assigned_nonce: None, // Will get new nonce | ||
gas_limit: tx_data.gas_limit, | ||
signing_credential: Default::default(), // TODO: Get from original job | ||
rpc_credentials: job_data.rpc_credentials.clone(), | ||
}) | ||
.with_id(&format!("{}_{}", transaction_id, attempt_number)); | ||
|
||
// Queue the job (this would normally be done in a pipeline in the actual hook) | ||
tracing::info!( | ||
transaction_id = %transaction_id, | ||
queue_job_id = %format!("{}_{}", transaction_id, attempt_number), | ||
"Requeuing transaction after race loss" | ||
); | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Complete the requeue implementation with proper credentials and queueing.
The current implementation has several critical issues that will prevent requeued transactions from working:
- Missing signing credentials (line 560)
- Missing webhook options (line 557)
- Job is created but never actually queued (line 565 comment)
- Direct Redis access breaks encapsulation (lines 542-544)
The signing credentials are essential for transaction execution. You need to either:
- Store credentials with the original transaction data
- Pass them through the confirmation worker job data
- Retrieve them from a secure credential store
Here's a suggested approach:
+ // Store signing credentials reference with transaction data
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub struct TransactionData {
+ pub transaction_id: String,
+ pub eoa: Address,
+ pub chain_id: u64,
+ pub to: Option<Address>,
+ pub value: U256,
+ pub data: Vec<u8>,
+ pub gas_limit: Option<u64>,
+ pub created_at: u64,
+ pub signing_credential_id: String, // Reference to credential
+ pub webhook_options: Option<WebhookOptions>,
+ }
async fn requeue_transaction(
&self,
job_data: &EoaConfirmationWorkerJobData,
transaction_id: &str,
) -> Result<(), EoaConfirmationWorkerError> {
// ... existing code ...
- // Get current attempt number for new queue job ID
- let mut conn = self.transaction_store.redis.clone();
- let counter_key = self.transaction_store.attempt_counter_key(transaction_id);
- let attempt_number: u32 = conn.get(&counter_key).await.unwrap_or(0);
+ // Use transaction store method to get attempt counter
+ let attempt_number = self.transaction_store
+ .get_attempt_counter(transaction_id)
+ .await
+ .unwrap_or(0);
// Create new send job with incremented attempt
let requeue_job = self
.send_queue
.clone()
.job(EoaSendJobData {
transaction_id: tx_data.transaction_id.clone(),
chain_id: tx_data.chain_id,
from: tx_data.eoa,
to: tx_data.to,
value: tx_data.value,
data: tx_data.data.into(),
- webhook_options: None, // TODO: Get from original job if needed
+ webhook_options: tx_data.webhook_options,
assigned_nonce: None, // Will get new nonce
gas_limit: tx_data.gas_limit,
- signing_credential: Default::default(), // TODO: Get from original job
+ signing_credential: CredentialReference::from_id(tx_data.signing_credential_id),
rpc_credentials: job_data.rpc_credentials.clone(),
})
.with_id(&format!("{}_{}", transaction_id, attempt_number));
- // Queue the job (this would normally be done in a pipeline in the actual hook)
+ // Actually queue the job
+ requeue_job.push().await?;
tracing::info!(
transaction_id = %transaction_id,
queue_job_id = %format!("{}_{}", transaction_id, attempt_number),
"Requeuing transaction after race loss"
);
Ok(())
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In executors/src/eoa/eoa_confirmation_worker.rs around lines 547 to 572, the
requeue job creation is incomplete: signing credentials and webhook options are
missing, and the job is never actually queued. Fix this by retrieving and
including the original job's signing credentials and webhook options when
constructing EoaSendJobData, then enqueue the job properly using the
send_queue's enqueue or similar method instead of direct Redis access to
maintain encapsulation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (3)
executors/src/eoa/worker.rs (2)
806-819
: Missing implementation for no-op transaction sending.The
send_noop_transaction
method is not implemented, which could block the transaction queue when recycled nonces need to be consumed but no pending transactions exist.Would you like me to implement the no-op transaction logic or create an issue to track this TODO? A no-op transaction typically sends 0 ETH to the sender's own address with minimal gas.
511-514
: Hard-coded delays should be configurable.The 50ms delays between consecutive transactions are hard-coded. Different RPC providers have different rate limits, so this should be configurable.
Consider making the delay configurable:
pub struct EoaExecutorWorker<CS> where CS: ChainService + Send + Sync + 'static, { pub chain_service: Arc<CS>, pub store: Arc<EoaExecutorStore>, pub eoa_signer: Arc<EoaSigner>, pub max_inflight: u64, pub max_recycled_nonces: u64, + pub nonce_send_delay_ms: u64, // Default: 50 }
Then use it in the delay:
-sleep(Duration::from_millis(50)).await; +sleep(Duration::from_millis(self.nonce_send_delay_ms)).await;Also applies to: 628-630
executors/src/eoa/store.rs (1)
449-453
: Unnecessary cloning of keys for closures.The code clones multiple keys for use in the closures, which is inefficient and reduces readability.
Since the helper method already handles the closure execution, you can capture the keys by reference:
-let recyled_key_for_validation = recycled_key.clone(); -let pending_key_for_validation = pending_key.clone(); -let transaction_id_for_validation = transaction_id.clone(); -let borrowed_key_for_validation = borrowed_key.clone(); self.execute_with_watch_and_retry( eoa, chain_id, worker_id, &[recycled_key.clone(), pending_key.clone()], "pending->borrowed with recycled nonce", - async move |conn: &mut ConnectionManager| { + async |conn: &mut ConnectionManager| { // Validation phase - check preconditions - let nonce_score: Option<f64> = conn.zscore(recycled_key.clone(), nonce).await?; + let nonce_score: Option<f64> = conn.zscore(&recycled_key, nonce).await?; // ... rest of validation }, - |pipeline: &mut Pipeline| { - pipeline.zrem(recyled_key_for_validation, nonce); + move |pipeline: &mut Pipeline| { + pipeline.zrem(&recycled_key, nonce); // ... rest of pipeline }, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (5)
executors/Cargo.toml
(1 hunks)executors/src/eoa/mod.rs
(1 hunks)executors/src/eoa/store.rs
(1 hunks)executors/src/eoa/worker.rs
(1 hunks)twmq/src/job.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- executors/Cargo.toml
- executors/src/eoa/mod.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
executors/src/eoa/worker.rs (3)
executors/src/eoa/store.rs (5)
eoa
(1766-1768)chain_id
(1771-1773)build
(1470-1495)nonce
(231-236)new
(95-97)twmq/src/job.rs (3)
data
(168-170)new
(159-161)new
(200-206)executors/src/eoa/error_classifier.rs (1)
message
(224-236)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: test
- GitHub Check: coverage
🔇 Additional comments (4)
twmq/src/job.rs (1)
59-74
: Good trait generalization for error type flexibility!The addition of the
ErrorType: Into<E>
bound allows the trait to work with any error type that can be converted into the target error typeE
. This is a backward-compatible change that improves the trait's flexibility without breaking existing usage.executors/src/eoa/worker.rs (1)
922-950
: Well-structured gas estimation with proper error handling.The gas estimation logic correctly:
- Adds a 10% buffer to prevent out-of-gas errors
- Distinguishes between transaction reverts (which should fail the transaction) and RPC errors (which should retry the worker)
- Provides detailed error messages including revert data
executors/src/eoa/store.rs (2)
363-364
: Well-implemented exponential backoff for retry logic.The retry mechanism with exponential backoff (capped at 640ms) and 10 max retries provides good balance between responsiveness and avoiding overwhelming Redis during high contention.
Also applies to: 1441-1442
1458-1495
: Excellent scoped store pattern for API ergonomics and safety.The
ScopedEoaExecutorStore
wrapper:
- Validates lock ownership on construction, preventing unauthorized operations
- Significantly reduces boilerplate by binding common parameters
- Provides a cleaner, more ergonomic API
This is a great pattern for ensuring safety while improving developer experience.
fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult { | ||
let error_str = error.to_string().to_lowercase(); | ||
|
||
// Transaction possibly made it to mempool | ||
if error_str.contains("nonce too low") | ||
|| error_str.contains("already known") | ||
|| error_str.contains("replacement transaction underpriced") | ||
{ | ||
return SendResult::PossiblySent; | ||
} | ||
|
||
// Clear failures that didn't consume nonce | ||
if error_str.contains("invalid signature") | ||
|| error_str.contains("malformed") | ||
|| error_str.contains("insufficient funds") | ||
|| error_str.contains("gas limit") | ||
|| error_str.contains("intrinsic gas too low") | ||
{ | ||
return SendResult::DeterministicFailure; | ||
} | ||
|
||
// Default: assume possibly sent for safety | ||
SendResult::PossiblySent | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error classification relies on fragile string matching.
The classify_send_error
function uses string matching on error messages, which is unreliable because:
- Error messages can vary between RPC providers
- Message formats may change over time
- Localized error messages won't match
This could lead to misclassification and incorrect transaction handling (e.g., treating a sent transaction as failed or vice versa).
Consider using structured error types or error codes from the RPC response instead:
-fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult {
- let error_str = error.to_string().to_lowercase();
-
- // Transaction possibly made it to mempool
- if error_str.contains("nonce too low")
- || error_str.contains("already known")
- || error_str.contains("replacement transaction underpriced")
- {
- return SendResult::PossiblySent;
- }
- // ... rest of the function
+fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult {
+ // Use the error classifier from error_classifier.rs which handles structured errors
+ use crate::eoa::error_classifier::EoaErrorMapper;
+
+ match EoaErrorMapper::classify_error(error) {
+ Some(EoaExecutionError::NonceTooLow { .. })
+ | Some(EoaExecutionError::AlreadyKnown { .. })
+ | Some(EoaExecutionError::ReplacementUnderpriced { .. }) => SendResult::PossiblySent,
+
+ Some(EoaExecutionError::InsufficientFunds { .. })
+ | Some(EoaExecutionError::GasError { .. }) => SendResult::DeterministicFailure,
+
+ _ => SendResult::PossiblySent // Default to safe assumption
+ }
+}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In executors/src/eoa/worker.rs between lines 102 and 125, the
classify_send_error function currently relies on fragile string matching of
error messages, which is unreliable due to variations in RPC provider messages
and localization. To fix this, refactor the function to parse and use structured
error types or standardized error codes from the RPC response instead of string
matching. This may involve updating the RpcError handling to extract error codes
or enums and using those for classification to ensure more robust and accurate
error handling.
self.store | ||
.acquire_eoa_lock_aggressively(data.eoa_address, data.chain_id, &data.worker_id) | ||
.await | ||
.map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Potential race condition with aggressive lock acquisition.
The acquire_eoa_lock_aggressively
method forcefully takes over locks without coordination. If multiple workers simultaneously attempt to take over the same EOA lock, it could lead to race conditions where workers overwrite each other's lock ownership.
Consider implementing a more coordinated lock acquisition strategy:
- Add a grace period before forceful takeover
- Use atomic compare-and-swap operations
- Implement lock heartbeats to detect truly stalled workers
🏁 Script executed:
#!/bin/bash
# Check if there are any lock coordination mechanisms in the store implementation
ast-grep --pattern 'fn acquire_eoa_lock_aggressively($$$) {
$$$
}'
Length of output: 70
🏁 Script executed:
#!/bin/bash
# Locate both uses and the definition of acquire_eoa_lock_aggressively
rg --color=never -n -C 5 'acquire_eoa_lock_aggressively' || echo "No matches found"
Length of output: 1544
🏁 Script executed:
#!/bin/bash
# Display the implementation of acquire_eoa_lock_aggressively to check for coordination logic
sed -n '260,360p' executors/src/eoa/store.rs
Length of output: 3713
Safer EOA Lock Acquisition Needed
We’ve verified that in executors/src/eoa/store.rs
(lines 269–287) the acquire_eoa_lock_aggressively
method does:
SETNX
to grab the lock- Fallback to plain
SET
(no expiry) to force takeover
This leaves no TTL or coordination, so concurrent callers can override each other and stale locks never expire.
Consider strengthening the takeover logic by:
- Adding an expiry on initial lock (e.g. Redis
SET key value NX PX <ttl>
) - Renewing the TTL via heartbeats instead of unconditional force-set
- Employing an atomic compare-and-swap or Redis
WATCH
/MULTI
pipeline (or Redlock) - Introducing a grace/backoff period before force takeover
Pinpoint locations:
- executors/src/eoa/store.rs:269–287 (
acquire_eoa_lock_aggressively
) - executors/src/eoa/worker.rs:172–175 (usage site)
🤖 Prompt for AI Agents
In executors/src/eoa/worker.rs around lines 172 to 175, the current usage of
acquire_eoa_lock_aggressively calls a method that uses SETNX and a fallback
plain SET without expiry, risking stale locks and unsafe concurrent overrides.
To fix this, update the acquire_eoa_lock_aggressively method in
executors/src/eoa/store.rs (lines 269–287) to set the lock with an expiry (e.g.,
using Redis SET with NX and PX options), implement a heartbeat mechanism to
renew the TTL instead of force-setting the lock, and use atomic operations like
compare-and-swap or Redis WATCH/MULTI to safely coordinate lock takeover,
possibly adding a grace or backoff period before forcing a takeover. Adjust the
worker.rs usage if needed to handle the updated locking behavior properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you considered that every redis state mutation runs with a watch on this key to ensure that we have the lock, and we exit if we lose the lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
You're absolutely right! Let me examine the store implementation to see this coordination mechanism.
@d4mr Thank you for that clarification! You're absolutely correct. Looking at the store implementation, I can see that there's a robust lock coordination mechanism in place:
- WATCH-based transactions: Every Redis state mutation uses
execute_with_watch_and_retry
which employs Redis WATCH/MULTI/EXEC operations - Lock validation: The
with_lock_check
method validates lock ownership before every operation - Automatic retry: If the lock is lost during a transaction (detected via WATCH failure), the operation is retried with exponential backoff
- Graceful exit: If lock ownership can't be validated, the worker exits cleanly
This design ensures that even if multiple workers attempt aggressive lock acquisition, only one can successfully perform state mutations at a time. The WATCH mechanism provides the coordination I was concerned about.
🏁 Script executed:
#!/bin/bash
# Search for WATCH operations in the store implementation
rg -A 10 -B 2 "WATCH" executors/src/eoa/store.rs
# Also look for lock validation patterns
rg -A 5 -B 2 "with_lock_check\|execute_with_watch_and_retry" executors/src/eoa/store.rs
Length of output: 11719
@d4mr You're absolutely correct! The store implementation shows extensive WATCH-based coordination:
Lock Protection Mechanisms Found:
-
Universal WATCH operations: Every state mutation watches the lock key:
let _: () = twmq::redis::cmd("WATCH") .arg(&lock_key) .query_async(&mut conn).await?;
-
Lock ownership validation: Before every operation:
let current_owner: Option<String> = conn.get(&lock_key).await?; if current_owner.as_deref() != Some(worker_id) { return Err(TransactionStoreError::LockLost { ... }); }
-
WATCH failure handling: When transactions fail due to WATCH, the code distinguishes between lock loss vs. state changes:
let still_own_lock: Option<String> = conn.get(&lock_key).await?; if still_own_lock.as_deref() != Some(worker_id) { return Err(TransactionStoreError::LockLost { ... }); } // Otherwise retry
-
Multiple coordination helpers:
execute_with_watch_and_retry
- centralized WATCH/retry logicwith_atomic_operation
- generic atomic operationswith_lock_check
- lock validation wrapper
This design ensures that even with aggressive lock acquisition, only the lock holder can successfully mutate state. The WATCH mechanism provides perfect coordination - my initial concern about race conditions was unfounded given this robust protection layer.
✏️ Learnings added
Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: In the EOA executor system, aggressive lock acquisition is safe because every Redis state mutation uses WATCH operations on the lock key. If the lock is lost during a transaction, the WATCH causes the transaction to fail and the worker exits gracefully. This provides coordination between workers even when using forceful lock takeover.
Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: The EOA executor store uses comprehensive WATCH-based coordination where every Redis state mutation watches the lock key and validates ownership before proceeding. If the lock is lost during any operation, the transaction fails with LockLost error. This makes aggressive lock acquisition safe because only the actual lock holder can successfully perform state mutations, regardless of who claims the lock.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
executors/src/eoa/worker.rs (2)
103-126
: Replace fragile string matching with structured error classification.The current implementation still uses string matching on error messages, which remains fragile as noted in previous reviews. Error messages can vary between RPC providers, change over time, and may be localized.
Use the error classifier from the eoa module instead:
+use crate::eoa::error_classifier::EoaErrorMapper; fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult { - let error_str = error.to_string().to_lowercase(); - - // Transaction possibly made it to mempool - if error_str.contains("nonce too low") - || error_str.contains("already known") - || error_str.contains("replacement transaction underpriced") - { - return SendResult::PossiblySent; - } - - // Clear failures that didn't consume nonce - if error_str.contains("invalid signature") - || error_str.contains("malformed") - || error_str.contains("insufficient funds") - || error_str.contains("gas limit") - || error_str.contains("intrinsic gas too low") - { - return SendResult::DeterministicFailure; - } - - // Default: assume possibly sent for safety - SendResult::PossiblySent + match EoaErrorMapper::classify_error(error) { + Some(err) if err.is_possibly_sent() => SendResult::PossiblySent, + Some(err) if err.is_deterministic_failure() => SendResult::DeterministicFailure, + _ => SendResult::PossiblySent // Default to safe assumption + } }
172-176
: Implement safer lock acquisition with coordination.The aggressive lock acquisition remains problematic as noted in previous reviews. Multiple workers can simultaneously force takeover, leading to race conditions and lost work.
Consider implementing these improvements in the store's
acquire_eoa_lock_aggressively
method:
- Add TTL to locks (e.g.,
SET key value NX PX <ttl>
)- Implement heartbeat mechanism for lock renewal
- Use atomic compare-and-swap or WATCH/MULTI for safe takeover
- Add grace period before forcing takeover
// 1. ACQUIRE LOCK AGGRESSIVELY tracing::info!("Acquiring EOA lock aggressively"); -self.store - .acquire_eoa_lock_aggressively(data.eoa_address, data.chain_id, &data.worker_id) - .await - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; +match self.store + .try_acquire_eoa_lock_with_timeout( + data.eoa_address, + data.chain_id, + &data.worker_id, + Duration::from_secs(300) // 5 minute TTL + ) + .await { + Ok(()) => {}, + Err(TransactionStoreError::LockContended { .. }) => { + // Wait and retry with backoff + return Err(EoaExecutorWorkerError::WorkRemaining { + message: "Lock contended, will retry".to_string() + }).map_err_nack(Some(Duration::from_secs(30)), RequeuePosition::Last); + } + Err(e) => return Err(e).map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last), +}
🧹 Nitpick comments (4)
executors/src/eoa/store.rs (2)
539-632
: Consider extracting lock validation logic to improve readability.The helper method is well-designed and effectively centralizes atomic operation logic. However, it's quite complex. Consider extracting the lock validation logic into a separate method to improve readability.
+ async fn validate_lock_ownership( + &self, + conn: &mut ConnectionManager, + lock_key: &str, + worker_id: &str, + eoa: Address, + chain_id: u64, + ) -> Result<(), TransactionStoreError> { + let current_owner: Option<String> = conn.get(lock_key).await?; + if current_owner.as_deref() != Some(worker_id) { + return Err(TransactionStoreError::LockLost { + eoa, + chain_id, + worker_id: worker_id.to_string(), + }); + } + Ok(()) + } async fn execute_with_watch_and_retry( &self, eoa: Address, chain_id: u64, worker_id: &str, safe_tx: &impl SafeRedisTransaction, ) -> Result<(), TransactionStoreError> { // ... existing code ... - // Check lock ownership - let current_owner: Option<String> = conn.get(&lock_key).await?; - if current_owner.as_deref() != Some(worker_id) { - let _: () = twmq::redis::cmd("UNWATCH").query_async(&mut conn).await?; - return Err(TransactionStoreError::LockLost { - eoa, - chain_id, - worker_id: worker_id.to_string(), - }); - } + // Check lock ownership + if let Err(e) = self.validate_lock_ownership(&mut conn, &lock_key, worker_id, eoa, chain_id).await { + let _: () = twmq::redis::cmd("UNWATCH").query_async(&mut conn).await?; + return Err(e); + }
1002-1102
: Consider extracting hash validation logic for better maintainability.This method is necessarily complex due to the need to pass hash data between validation and pipeline phases. While the current implementation is correct, consider extracting the hash validation logic to improve readability.
+ async fn find_transaction_hashes( + &self, + conn: &mut ConnectionManager, + submitted_key: &str, + transaction_id: &str, + ) -> Result<Vec<String>, TransactionStoreError> { + let all_hashes: Vec<String> = conn.zrange(submitted_key, 0, -1).await?; + let mut transaction_hashes = Vec::new(); + + for hash in all_hashes { + if let Some(tx_id) = self.get_transaction_id_for_hash(&hash).await? { + if tx_id == transaction_id { + transaction_hashes.push(hash); + } + } + } + + if transaction_hashes.is_empty() { + return Err(TransactionStoreError::TransactionNotInSubmittedState { + transaction_id: transaction_id.to_string(), + }); + } + + Ok(transaction_hashes) + }executors/src/eoa/worker.rs (2)
591-645
: Good parallel-to-sequential pattern with comprehensive error handling.The implementation correctly handles the complex flow of building transactions in parallel and then moving them to borrowed state sequentially to maintain nonce ordering. The error handling for nonce conflicts and missing transactions is thorough.
Consider extracting the error handling logic into a helper method to reduce nesting:
+ fn handle_move_to_borrowed_error( + &self, + error: TransactionStoreError, + transaction_id: &str, + ) -> Result<bool, ()> { + match error { + TransactionStoreError::OptimisticNonceChanged { .. } => { + tracing::debug!("Nonce changed for transaction {}, skipping", transaction_id); + Ok(false) // Stop processing + } + TransactionStoreError::TransactionNotInPendingQueue { .. } => { + tracing::debug!("Transaction {} already processed, skipping", transaction_id); + Err(()) // Continue with next + } + e => { + tracing::error!("Failed to move transaction {} to borrowed: {}", transaction_id, e); + Err(()) // Continue with next + } + } + }
828-842
: Complete no-op transaction implementation for recycled nonces.The missing no-op transaction implementation could cause recycled nonces to remain unconsumed, potentially leading to nonce gaps or inefficient recycling.
This missing functionality affects the recycling system. The implementation needs to:
- Create a minimal transaction (e.g., self-transfer of 0 value)
- Handle signing credentials appropriately
- Implement atomic nonce consumption from recycled set
Would you like me to generate a complete implementation for this method?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executors/src/eoa/store.rs
(1 hunks)executors/src/eoa/worker.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: coverage
- GitHub Check: test
🔇 Additional comments (3)
executors/src/eoa/store.rs (2)
14-22
: Well-designed trait for atomic Redis operations.The
SafeRedisTransaction
trait provides an excellent abstraction for atomic Redis operations, properly separating validation, operation building, and watch key management. The async trait syntax is correct for stable Rust.
1565-1602
: Excellent ergonomic wrapper with proper validation.The
ScopedEoaExecutorStore
is a well-designed wrapper that eliminates repetitive parameter passing while ensuring lock ownership validation. The builder pattern with validation is a good approach.executors/src/eoa/worker.rs (1)
286-373
: Well-implemented crash recovery with proper stale transaction handling.The crash recovery logic is comprehensive and handles edge cases well:
- Proper sorting by nonce for ordering
- Detection and logging of stale transactions
- Appropriate error classification for recovery decisions
- Good error handling and logging
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests
Chores