Skip to content

EOA execution #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

EOA execution #5

wants to merge 6 commits into from

Conversation

d4mr
Copy link
Member

@d4mr d4mr commented Jul 3, 2025

Summary by CodeRabbit

  • New Features

    • Introduced a robust multilane job queue system with Redis-backed storage, supporting fair and concurrent processing across lanes, lifecycle management, cancellation, and batch operations.
    • Added a comprehensive executor for externally owned account (EOA) transactions, including error classification, transaction store, and worker logic for Ethereum-like environments.
    • Exposed new modules and types for multilane queues and EOA execution in public APIs.
  • Bug Fixes

    • Improved error handling and recovery strategies for EOA transaction execution.
  • Refactor

    • Generalized worker and shutdown handle types for broader compatibility.
    • Enhanced trait implementations for error conversion in job results.
  • Tests

    • Added extensive integration tests for multilane queue batch operations, ensuring fairness and scalability across large numbers of lanes.
  • Chores

    • Updated CI dependencies and reformatted configuration files for clarity.
    • Adjusted dependency branches for internal crates.

Copy link

coderabbitai bot commented Jul 3, 2025

Walkthrough

This change introduces a robust multilane queue system in the twmq crate with Redis-backed atomic operations, fair job distribution, and comprehensive batch pop functionality. It adds a new EOA (Externally Owned Account) transaction executor in the executors crate, including error classification, a Redis transaction store, and an asynchronous worker for transaction lifecycle management. Updates are made to queue worker and shutdown abstractions, and new integration tests are provided for multilane batch popping.

Changes

File(s) / Group Change Summary
.github/workflows/ci-twmq.yaml,
.github/workflows/coverage-twmq.yaml
Updated taiki-e/install-action to a newer commit/version in GitHub Actions workflows.
Cargo.toml Reformatted [workspace] members to multi-line array; no content change.
aa-core/Cargo.toml,
core/Cargo.toml
Updated branch reference for vault-types and vault-sdk dependencies from main to pb/update-alloy.
aa-core/src/userop/builder.rs Removed unused import and trailing whitespace.
core/src/signer.rs Added async sign_transaction method to AccountSigner trait and implemented for EoaSigner. Handles Vault and IAW credentials.
executors/Cargo.toml Added dependencies: tokio (with "full" feature) and futures.
executors/src/eoa/error_classifier.rs New module: error classification for EOA transaction execution, with enums, mapping logic, and recovery strategies.
executors/src/eoa/mod.rs New module: public submodules error_classifier, store, worker; re-exports error types and strategies.
executors/src/eoa/store.rs New file: Redis-backed transaction store for EOAs, with atomic operations, nonce management, and concurrency control.
executors/src/eoa/worker.rs New file: EOA transaction worker implementing job processing, crash recovery, confirmation, and sending logic.
executors/src/lib.rs Declared new public module eoa; reordered webhook module declaration.
twmq/src/job.rs Generalized ToJobResult trait for Result, allowing error conversion via Into<E>. Minor whitespace cleanup.
twmq/src/lib.rs Added public multilane module and re-exports. Changed Queue::work return type from WorkerHandle<H> to WorkerHandle<Queue<H>>.
twmq/src/multilane.rs New file: implements MultilaneQueue with fair, concurrent, and atomic job management across lanes using Redis and Tokio.
twmq/src/shutdown.rs Generalized WorkerHandle and ShutdownHandle for any queue type via a new QueueIdentifier trait. Updated logging and method signatures accordingly.
twmq/tests/multilane_batch_pop.rs New integration tests: batch pop, fairness, and performance for multilane queues with up to 100,000 lanes.

Sequence Diagram(s)

Multilane Job Batch Pop and Processing

sequenceDiagram
    participant Client
    participant MultilaneQueue
    participant Redis
    participant Worker
    participant Handler

    Client->>MultilaneQueue: push_to_lane(job, lane_id)
    MultilaneQueue->>Redis: Store job data and update lane sets
    Redis-->>MultilaneQueue: OK

    Worker->>MultilaneQueue: pop_batch_jobs(batch_size)
    MultilaneQueue->>Redis: Lua script to atomically pop jobs from lanes
    Redis-->>MultilaneQueue: List of jobs (by lane)
    MultilaneQueue-->>Worker: BorrowedJob(s)

    loop For each BorrowedJob
        Worker->>Handler: process(job)
        Handler-->>Worker: JobResult (Success/Nack/Fail)
        Worker->>MultilaneQueue: complete_job(BorrowedJob, JobResult)
        MultilaneQueue->>Redis: Atomic update (WATCH/MULTI/EXEC)
        Redis-->>MultilaneQueue: OK
    end
Loading

EOA Executor Worker Transaction Lifecycle

sequenceDiagram
    participant EoaExecutorWorker
    participant EoaExecutorStore
    participant ChainService
    participant EoaSigner
    participant Blockchain

    EoaExecutorWorker->>EoaExecutorStore: Acquire lock for EOA
    EoaExecutorWorker->>ChainService: Get chain for chain_id
    EoaExecutorWorker->>EoaExecutorStore: Crash recovery (rebroadcast borrowed txs)
    EoaExecutorWorker->>ChainService: Send transaction (if needed)
    ChainService->>Blockchain: Broadcast transaction
    Blockchain-->>ChainService: Tx hash / error
    ChainService-->>EoaExecutorWorker: Result

    EoaExecutorWorker->>EoaExecutorStore: Confirm on-chain txs
    EoaExecutorWorker->>ChainService: Get on-chain nonce
    ChainService-->>EoaExecutorWorker: Nonce
    EoaExecutorWorker->>EoaExecutorStore: Update state, mark confirmed/failed

    EoaExecutorWorker->>EoaExecutorStore: Send new or recycled txs
    EoaExecutorWorker->>EoaSigner: Sign transaction
    EoaSigner-->>EoaExecutorWorker: Signature
    EoaExecutorWorker->>ChainService: Send signed transaction
    ChainService-->>EoaExecutorWorker: Result

    EoaExecutorWorker->>EoaExecutorStore: Update transaction state
Loading

Possibly related PRs

  • thirdweb-dev/engine-core#2: Adds IAW auth token support for signing messages and typed data in core/src/signer.rs, which is directly extended by this PR's addition of the async sign_transaction method to the same trait and struct.

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Clippy (1.86.0)
Updating crates.io index
Updating git repository `ssh://git@github.com/thirdweb-dev/vault.git`

error: failed to get vault-sdk as a dependency of package engine-aa-core v0.1.0 (/aa-core)

Caused by:
failed to load source for dependency vault-sdk

Caused by:
Unable to update ssh://git@github.com/thirdweb-dev/vault.git?branch=pb%2Fupdate-alloy#b0a72f93

Caused by:
failed to create directory /usr/local/git/db/vault-02bb070e2aa0045e

Caused by:
Permission denied (os error 13)

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🔭 Outside diff range comments (1)
aa-core/src/userop/builder.rs (1)

316-318: Potential panic via unwrap_or_default()

unwrap_or_default() silently masks missing paymaster_post_op_gas_limit by inserting 0. If that value is required by the protocol, defaulting to zero could cause on-chain reverts. Consider propagating an error instead.

-                    bundler_response
-                        .paymaster_post_op_gas_limit
-                        .unwrap_or_default(),
+                    bundler_response
+                        .paymaster_post_op_gas_limit
+                        .ok_or(EngineError::MissingBundlerGasLimit)?,

Replace with an appropriate domain error.

♻️ Duplicate comments (1)
core/Cargo.toml (1)

13-14: Same branch-pinning concern as aa-core

See previous comment—use an immutable rev for both vault-* dependencies to avoid surprise breakages.

🧹 Nitpick comments (6)
aa-core/src/userop/builder.rs (2)

238-246: Missing error context on gas-price fetch failure

If paymaster_client().get_user_op_gas_fees() fails, the propagated error loses context about which entrypoint version (v0.7) failed. Consider adding contextual info:

-            .map_err(|e| e.to_engine_error(self.chain))?;
+            .map_err(|e| {
+                tracing::error!("v0.7 gas-fee fetch failed: {e}");
+                e.to_engine_error(self.chain)
+            })?;

This small addition greatly eases troubleshooting in prod.


260-266: Unnecessary Some() wrapping

paymaster and paymaster_data are already Options—pm_response returns concrete values. Wrapping again with Some() is redundant when fields are mutable:

-        self.userop.paymaster = Some(pm_response.paymaster);
-        self.userop.paymaster_data = Some(pm_response.paymaster_data);
+        self.userop.paymaster = pm_response.paymaster;
+        self.userop.paymaster_data = pm_response.paymaster_data;

Assuming the struct definitions use Option<Bytes> this simplifies the code path.

executors/src/eoa/error_classifier.rs (1)

265-314: Add more comprehensive test coverage.

The current tests cover basic scenarios but miss edge cases and recovery strategy variations.

Consider adding tests for:

  • Empty error messages
  • Error messages with special characters
  • Case sensitivity edge cases
  • All error variants
  • Recovery strategy validation for each error type

Example additional test:

#[test]
fn test_edge_cases() {
    // Empty message
    let error = EoaErrorMapper::map_ethereum_error(-32000, "");
    match error {
        EoaExecutionError::RpcError { .. } => {}
        _ => panic!("Expected RpcError for empty message"),
    }
    
    // Mixed case
    let error = EoaErrorMapper::map_ethereum_error(-32000, "NoNcE tOo LoW");
    match error {
        EoaExecutionError::NonceTooLow { .. } => {}
        _ => panic!("Expected NonceTooLow for mixed case"),
    }
}

#[test]
fn test_all_recovery_strategies() {
    // Test each error variant has appropriate strategy
    let test_cases = vec![
        (EoaExecutionError::NonceTooLow { message: "test".into() }, true, false),
        (EoaExecutionError::InsufficientFunds { message: "test".into() }, false, true),
        // ... other variants
    ];
    
    for (error, expected_queue_confirmation, expected_recycle) in test_cases {
        let strategy = EoaErrorMapper::get_recovery_strategy(&error);
        assert_eq!(strategy.queue_confirmation, expected_queue_confirmation);
        assert_eq!(strategy.recycle_nonce, expected_recycle);
    }
}
executors/src/eoa/nonce_manager.rs (1)

412-436: Simplify return type for get_nonce_assignments.

This method returns a Vec but only ever contains zero or one element, which is misleading.

Change the return type to Option<NonceAssignment>:

-pub async fn get_nonce_assignments(
+pub async fn get_nonce_assignment(
     &self,
     eoa: Address,
     chain_id: u64,
     nonce: U256,
-) -> Result<Vec<NonceAssignment>, NonceManagerError> {
+) -> Result<Option<NonceAssignment>, NonceManagerError> {
     let mut conn = self.redis.clone();
     let assignments_key = self.nonce_assignments_key(eoa, chain_id);

     let assignment_json: Option<String> =
         conn.hget(&assignments_key, nonce.to_string()).await?;

     match assignment_json {
         Some(json) => {
             let assignment: NonceAssignment = serde_json::from_str(&json).map_err(|e| {
                 NonceManagerError::NonceAssignmentFailed {
                     reason: format!("Failed to deserialize assignment: {}", e),
                 }
             })?;
-            Ok(vec![assignment])
+            Ok(Some(assignment))
         }
-        None => Ok(vec![]),
+        None => Ok(None),
     }
 }
executors/src/eoa/send.rs (1)

341-341: Remove unused pre_computed_hash variable.

The pre-computed hash is calculated but never used.

-let pre_computed_hash = signed_tx.hash();
-
 // 9. Send transaction
 match chain.provider().send_tx_envelope(signed_tx.into()).await {

If you need to validate the hash matches, compare it with the returned hash:

let pre_computed_hash = signed_tx.hash();
// ... send transaction ...
assert_eq!(pre_computed_hash, tx_hash, "Hash mismatch");
twmq/src/multilane.rs (1)

1198-1235: Add retry limits and backoff to WATCH loop.

The infinite retry loop could cause issues under high contention.

Add retry limits and exponential backoff:

 // Execute with lease protection (same pattern as single-lane queue)
 let lease_key = self.lease_key_name(&job.job.id, &job.lease_token);
+let mut retry_count = 0;
+const MAX_RETRIES: u32 = 10;

 loop {
+    if retry_count >= MAX_RETRIES {
+        return Err(TwmqError::Runtime {
+            message: format!("Failed to complete job after {} retries", MAX_RETRIES),
+        });
+    }
+
     let mut conn = self.redis.clone();

     redis::cmd("WATCH")
         .arg(&lease_key)
         .query_async::<()>(&mut conn)
         .await?;

     // ... existing logic ...

     Err(_) => {
         tracing::debug!(job_id = %job.job.id, "WATCH failed during completion, retrying");
+        retry_count += 1;
+        if retry_count > 1 {
+            // Exponential backoff: 10ms, 20ms, 40ms, etc.
+            tokio::time::sleep(Duration::from_millis(10 * (1 << (retry_count - 1)))).await;
+        }
         continue;
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f1d53d8 and 623b545.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • .github/workflows/ci-twmq.yaml (1 hunks)
  • .github/workflows/coverage-twmq.yaml (1 hunks)
  • Cargo.toml (1 hunks)
  • aa-core/Cargo.toml (1 hunks)
  • aa-core/src/userop/builder.rs (1 hunks)
  • core/Cargo.toml (1 hunks)
  • core/src/signer.rs (6 hunks)
  • executors/Cargo.toml (1 hunks)
  • executors/src/eoa/eoa_confirmation_worker.rs (1 hunks)
  • executors/src/eoa/error_classifier.rs (1 hunks)
  • executors/src/eoa/mod.rs (1 hunks)
  • executors/src/eoa/nonce_manager.rs (1 hunks)
  • executors/src/eoa/send.rs (1 hunks)
  • executors/src/eoa/transaction_store.rs (1 hunks)
  • executors/src/lib.rs (1 hunks)
  • twmq/src/lib.rs (3 hunks)
  • twmq/src/multilane.rs (1 hunks)
  • twmq/src/shutdown.rs (4 hunks)
  • twmq/tests/multilane_batch_pop.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
twmq/src/lib.rs (1)
twmq/src/multilane.rs (1)
  • work (443-540)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: test
  • GitHub Check: coverage
🔇 Additional comments (19)
.github/workflows/coverage-twmq.yaml (1)

41-43: Confirmed: install-action@9185c192a96ba09167ad8663015b3fbbf007ec79 matches v2.56.2
Ran git ls-remote https://github.com/taiki-e/install-action.git refs/tags/v2.56.2 and the SHA aligns with the pinned hash.

.github/workflows/ci-twmq.yaml (1)

41-43: Consistency win—same action hash as coverage workflow

The CI workflow now uses the identical commit hash as the coverage job, eliminating drift.
No further issues spotted.

Cargo.toml (1)

2-10: Array re-formatting looks good

Purely stylistic change; no functional impact. TOML 1.0 allows the trailing comma, so this remains valid.

executors/Cargo.toml (1)

22-22: Chrono version 0.4.41 is valid on crates.io — no action needed

The crates.io API shows chrono’s max_version as 0.4.41, so the dependency declaration is correct and can be merged as-is.

executors/src/lib.rs (1)

1-4: LGTM! Clean module organization.

The addition of the eoa module and reordering of modules improves code organization by grouping related functionality.

twmq/src/lib.rs (3)

4-4: Module addition looks good.

The new multilane module integrates well with the existing structure.


18-18: Clean re-export of multilane types.

Re-exporting MultilaneQueue and MultilanePushableJob from the crate root follows good API design practices.


452-452: Return type update aligns with the generalized shutdown handling.

The change from WorkerHandle<H> to WorkerHandle<Queue<H>> correctly reflects the new generic WorkerHandle<Q> design that supports multiple queue types.

core/src/signer.rs (3)

2-2: Import addition is appropriate.

The TypedTransaction import is required for the new sign_transaction method signature.


181-187: Well-designed trait method addition.

The new sign_transaction method maintains consistency with existing trait methods and properly handles async operations.


309-344: Solid implementation of transaction signing.

The implementation correctly handles both Vault and IAW credential types with appropriate error handling and logging, maintaining consistency with other signing methods.

executors/src/eoa/mod.rs (1)

1-16: Well-structured module organization for EOA functionality.

The module cleanly organizes EOA-related functionality into logical submodules and provides a convenient public API through re-exports. The separation of concerns (confirmation, error classification, nonce management, sending, and storage) follows good architectural principles.

twmq/src/shutdown.rs (3)

5-9: Good generalization of WorkerHandle.

Making WorkerHandle generic over Q instead of being tied to DurableExecution improves flexibility and enables support for different queue types.


11-26: Clean abstraction with the QueueIdentifier trait.

The trait provides a minimal interface for queue identification, and the implementations correctly map to the appropriate methods (name() for Queue, queue_id() for MultilaneQueue).


88-91: Flexible generic signatures for worker management methods.

The updated method signatures properly support any queue type Q, providing good flexibility without unnecessary trait bounds at the method level.

Also applies to: 152-156, 159-167, 169-172

executors/src/eoa/transaction_store.rs (1)

46-51: LGTM!

The queue_job_id method provides a clean way to generate unique job IDs for transaction attempts.

executors/src/eoa/eoa_confirmation_worker.rs (3)

94-127: LGTM!

Clean implementation with proper dependency injection and trait implementations.


137-292: Well-structured confirmation monitoring logic.

The process method implements a comprehensive flow for monitoring transaction confirmations with proper error handling and retry logic. The nonce progression detection and active transaction monitoring approach is sound.


403-403: Verify receipt.status() behavior in the alloy library

It looks like receipt.status() is coming from the external alloy crate (no local implementation in our codebase). Please confirm whether this method returns a plain bool (guaranteed) or an Option<bool> (which could be None for pending receipts). If it can be None, consider guarding against it:

• File: executors/src/eoa/eoa_confirmation_worker.rs
Line: ~403

-                        if receipt.status() {
+                        if receipt.status().unwrap_or(false) {

This change ensures that any None (e.g., pending) status is treated as false, avoiding a potential panic or unintended behavior.

Comment on lines +11 to +12
vault-types = { version = "0.1.0", git = "ssh://git@github.com/thirdweb-dev/vault.git", branch = "pb/update-alloy" }
vault-sdk = { version = "0.1.0", git = "ssh://git@github.com/thirdweb-dev/vault.git", branch = "pb/update-alloy" }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pin commit hash instead of using a moving branch

Depending on the pb/update-alloy branch for vault-* crates introduces non-determinism—new commits to that branch may break reproducible builds. Prefer a specific commit or tag:

-vault-types = { version = "0.1.0", git = "...vault.git", branch = "pb/update-alloy" }
+vault-types = { git = "...vault.git", rev = "<commit-sha>" }

Same for vault-sdk.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
vault-types = { version = "0.1.0", git = "ssh://git@github.com/thirdweb-dev/vault.git", branch = "pb/update-alloy" }
vault-sdk = { version = "0.1.0", git = "ssh://git@github.com/thirdweb-dev/vault.git", branch = "pb/update-alloy" }
vault-types = { git = "ssh://git@github.com/thirdweb-dev/vault.git", rev = "<commit-sha>" }
vault-sdk = { git = "ssh://git@github.com/thirdweb-dev/vault.git", rev = "<commit-sha>" }
🤖 Prompt for AI Agents
In aa-core/Cargo.toml at lines 11 to 12, the dependencies for vault-types and
vault-sdk use a moving branch reference "pb/update-alloy," which causes
non-deterministic builds. Replace the branch specification with a fixed commit
hash or a specific tag for both dependencies to ensure reproducible builds.
Update the git dependency entries to include the exact commit hash instead of
the branch name.

Comment on lines +159 to +163
EoaExecutionError::AlreadyKnown { .. } => RecoveryStrategy {
queue_confirmation: true,
recycle_nonce: false,
needs_resync: false,
retryable: false,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling for JSON operations.

The current implementation silently ignores JSON deserialization errors and could panic on serialization errors.

Apply this diff to properly handle JSON errors:

-match health_json {
-    Some(json) => Ok(serde_json::from_str(&json).unwrap_or_default()),
-    None => Ok(EoaHealth::default()),
-}
+match health_json {
+    Some(json) => serde_json::from_str(&json)
+        .map_err(|e| NonceManagerError::NonceAssignmentFailed {
+            reason: format!("Failed to deserialize health data: {}", e),
+        }),
+    None => Ok(EoaHealth::default()),
+}

And for serialization:

-let health_json = serde_json::to_string(health).unwrap();
+let health_json = serde_json::to_string(health)
+    .map_err(|e| NonceManagerError::NonceAssignmentFailed {
+        reason: format!("Failed to serialize health data: {}", e),
+    })?;

Also applies to: 177-177

🤖 Prompt for AI Agents
In executors/src/eoa/error_classifier.rs at lines 159 to 163 and line 177, the
current code silently ignores JSON deserialization errors and may panic on
serialization errors. Modify the error handling logic to explicitly catch and
handle JSON deserialization errors by returning appropriate error results or
recovery strategies instead of ignoring them. Similarly, wrap serialization
operations in error handling to prevent panics, returning errors or fallback
values as needed. Ensure all JSON operations have proper error handling to avoid
silent failures or panics.

Comment on lines +83 to +118
match code {
-32000 => {
// Only handle the specific ethereum errors we care about
if msg_lower.contains("nonce too low") {
EoaExecutionError::NonceTooLow {
message: message.to_string(),
}
} else if msg_lower.contains("nonce too high") {
EoaExecutionError::NonceTooHigh {
message: message.to_string(),
}
} else if msg_lower.contains("already known") || msg_lower.contains("duplicate") {
EoaExecutionError::AlreadyKnown {
message: message.to_string(),
}
} else if msg_lower.contains("replacement") && msg_lower.contains("underpriced") {
EoaExecutionError::ReplacementUnderpriced {
message: message.to_string(),
}
} else if msg_lower.contains("insufficient funds") {
EoaExecutionError::InsufficientFunds {
message: message.to_string(),
}
} else if msg_lower.contains("gas") {
EoaExecutionError::GasError {
message: message.to_string(),
}
} else if msg_lower.contains("txpool") || msg_lower.contains("pool limit") {
EoaExecutionError::PoolLimitExceeded {
message: message.to_string(),
}
} else if msg_lower.contains("account") {
EoaExecutionError::AccountError {
message: message.to_string(),
}
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider more robust error message matching patterns.

The current string matching approach has potential issues:

  • Pattern "gas" (line 106) is too generic and could match unrelated errors
  • Pattern "account" (line 114) could also be too broad
  • No documentation about which RPC implementations these patterns target

Consider using more specific patterns and documenting the expected error formats:

 match code {
     -32000 => {
         // Only handle the specific ethereum errors we care about
+        // Error patterns based on go-ethereum, erigon, and common RPC implementations
         if msg_lower.contains("nonce too low") {
             EoaExecutionError::NonceTooLow {
                 message: message.to_string(),
             }
         // ... other conditions ...
-        } else if msg_lower.contains("gas") {
+        } else if msg_lower.contains("gas limit") || msg_lower.contains("gas price") 
+                || msg_lower.contains("intrinsic gas") || msg_lower.contains("out of gas") {
             EoaExecutionError::GasError {
                 message: message.to_string(),
             }
         // ... other conditions ...
-        } else if msg_lower.contains("account") {
+        } else if msg_lower.contains("account not found") || msg_lower.contains("unknown account") 
+                || msg_lower.contains("invalid account") {
             EoaExecutionError::AccountError {
                 message: message.to_string(),
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
match code {
-32000 => {
// Only handle the specific ethereum errors we care about
if msg_lower.contains("nonce too low") {
EoaExecutionError::NonceTooLow {
message: message.to_string(),
}
} else if msg_lower.contains("nonce too high") {
EoaExecutionError::NonceTooHigh {
message: message.to_string(),
}
} else if msg_lower.contains("already known") || msg_lower.contains("duplicate") {
EoaExecutionError::AlreadyKnown {
message: message.to_string(),
}
} else if msg_lower.contains("replacement") && msg_lower.contains("underpriced") {
EoaExecutionError::ReplacementUnderpriced {
message: message.to_string(),
}
} else if msg_lower.contains("insufficient funds") {
EoaExecutionError::InsufficientFunds {
message: message.to_string(),
}
} else if msg_lower.contains("gas") {
EoaExecutionError::GasError {
message: message.to_string(),
}
} else if msg_lower.contains("txpool") || msg_lower.contains("pool limit") {
EoaExecutionError::PoolLimitExceeded {
message: message.to_string(),
}
} else if msg_lower.contains("account") {
EoaExecutionError::AccountError {
message: message.to_string(),
}
} else {
match code {
-32000 => {
// Only handle the specific ethereum errors we care about
// Error patterns based on go-ethereum, erigon, and common RPC implementations
if msg_lower.contains("nonce too low") {
EoaExecutionError::NonceTooLow {
message: message.to_string(),
}
} else if msg_lower.contains("nonce too high") {
EoaExecutionError::NonceTooHigh {
message: message.to_string(),
}
} else if msg_lower.contains("already known") || msg_lower.contains("duplicate") {
EoaExecutionError::AlreadyKnown {
message: message.to_string(),
}
} else if msg_lower.contains("replacement") && msg_lower.contains("underpriced") {
EoaExecutionError::ReplacementUnderpriced {
message: message.to_string(),
}
} else if msg_lower.contains("insufficient funds") {
EoaExecutionError::InsufficientFunds {
message: message.to_string(),
}
} else if msg_lower.contains("gas limit") || msg_lower.contains("gas price")
|| msg_lower.contains("intrinsic gas") || msg_lower.contains("out of gas") {
EoaExecutionError::GasError {
message: message.to_string(),
}
} else if msg_lower.contains("txpool") || msg_lower.contains("pool limit") {
EoaExecutionError::PoolLimitExceeded {
message: message.to_string(),
}
} else if msg_lower.contains("account not found") || msg_lower.contains("unknown account")
|| msg_lower.contains("invalid account") {
EoaExecutionError::AccountError {
message: message.to_string(),
}
} else {
// ...
}
}
🤖 Prompt for AI Agents
In executors/src/eoa/error_classifier.rs between lines 83 and 118, the current
error message matching uses overly broad substrings like "gas" and "account"
which can cause false positives. Refine these patterns to be more specific and
less likely to match unrelated errors, for example by including additional
context words or exact phrases. Also, add comments documenting which RPC error
message formats these patterns are intended to match to improve maintainability
and clarity.

Comment on lines +160 to +184
impl Drop for MultilaneTestHarness {
fn drop(&mut self) {
// Cleanup in background since we can't await in drop
let queue_id = self.queue_id.clone();
let redis = self.queue.clone().redis.clone();

tokio::spawn(async move {
let mut conn = redis;
let keys_pattern = format!("twmq_multilane:{}:*", queue_id);
let keys: Vec<String> = redis::cmd("KEYS")
.arg(&keys_pattern)
.query_async(&mut conn)
.await
.unwrap_or_default();

if !keys.is_empty() {
redis::cmd("DEL")
.arg(keys)
.query_async::<()>(&mut conn)
.await
.unwrap_or_default();
}
});
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve cleanup reliability in Drop implementation.

Spawning async cleanup in Drop has issues:

  • Tasks might outlive the test runtime
  • No guarantee cleanup completes before next test
  • Could cause test interference

Consider explicit cleanup in tests or using a test fixture that ensures cleanup:

 impl Drop for MultilaneTestHarness {
     fn drop(&mut self) {
-        // Cleanup in background since we can't await in drop
-        let queue_id = self.queue_id.clone();
-        let redis = self.queue.clone().redis.clone();
-
-        tokio::spawn(async move {
-            let mut conn = redis;
-            let keys_pattern = format!("twmq_multilane:{}:*", queue_id);
-            let keys: Vec<String> = redis::cmd("KEYS")
-                .arg(&keys_pattern)
-                .query_async(&mut conn)
-                .await
-                .unwrap_or_default();
-
-            if !keys.is_empty() {
-                redis::cmd("DEL")
-                    .arg(keys)
-                    .query_async::<()>(&mut conn)
-                    .await
-                    .unwrap_or_default();
-            }
-        });
+        // Log warning if cleanup wasn't called
+        if !self.cleaned_up {
+            eprintln!("Warning: MultilaneTestHarness dropped without cleanup for queue: {}", self.queue_id);
+        }
     }
 }

Then ensure each test calls cleanup:

#[tokio::test]
async fn test_example() {
    let harness = MultilaneTestHarness::new().await;
    // ... test logic ...
    harness.cleanup().await;
}
🤖 Prompt for AI Agents
In twmq/tests/multilane_batch_pop.rs around lines 160 to 184, the Drop
implementation spawns an async cleanup task that may outlive the test runtime
and cause interference. Remove the async cleanup from Drop and instead implement
an explicit async cleanup method on MultilaneTestHarness. Update tests to call
this cleanup method at the end to ensure reliable and complete resource cleanup
before test exit.

Comment on lines +690 to +742
while jobs_popped < batch_size and empty_lanes_count < total_lanes do
local lane_id = lanes_with_scores[lane_index * 2 - 1]
-- Skip if we've already marked this lane as empty
if lane_id == nil then
lane_index = lane_index + 1
if lane_index > total_lanes then
lane_index = 1
end
else
local last_score = tonumber(lanes_with_scores[lane_index * 2])
-- Only cleanup if not visited this batch (score != now)
if last_score ~= now then
cleanup_lane_leases(lane_id)
process_delayed_jobs(lane_id)
redis.call('ZADD', lanes_zset_name, now, lane_id)
lanes_with_scores[lane_index * 2] = tostring(now)
end
-- Try to pop a job from this lane
local job_result = pop_job_from_lane(lane_id)
if job_result then
table.insert(result_jobs, {lane_id, job_result[1], job_result[2], job_result[3], job_result[4], job_result[5], job_result[6]})
jobs_popped = jobs_popped + 1
else
-- Lane is empty, mark it and count it
lanes_with_scores[lane_index * 2 - 1] = nil
lanes_with_scores[lane_index * 2] = nil
empty_lanes_count = empty_lanes_count + 1
-- Check if lane should be removed from Redis
local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending'
local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed'
local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active'
local pending_count = redis.call('LLEN', lane_pending_list)
local delayed_count = redis.call('ZCARD', lane_delayed_zset)
local active_count = redis.call('HLEN', lane_active_hash)
if pending_count == 0 and delayed_count == 0 and active_count == 0 then
redis.call('ZREM', lanes_zset_name, lane_id)
end
end
-- Move to next lane
lane_index = lane_index + 1
if lane_index > total_lanes then
lane_index = 1
end
end
end
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve lane tracking to avoid sparse arrays in Lua script.

Setting array elements to nil creates sparse arrays which can cause iteration issues in Lua.

Track empty lanes separately instead of modifying the array:

local empty_lanes = {}  -- Track empty lanes

-- In the loop, instead of:
-- lanes_with_scores[lane_index * 2 - 1] = nil
-- lanes_with_scores[lane_index * 2] = nil

-- Do:
empty_lanes[lane_id] = true

-- And check:
if empty_lanes[lane_id] then
    -- Skip this lane
else
    -- Process lane
end

This avoids sparse array issues and makes the logic clearer.

🤖 Prompt for AI Agents
In twmq/src/multilane.rs around lines 690 to 742, the current code sets elements
of the lanes_with_scores array to nil to mark empty lanes, which creates sparse
arrays and can cause iteration problems in Lua. Instead, introduce a separate
table (e.g., empty_lanes) to track empty lanes by their lane_id. Replace the nil
assignments with adding lane_id to empty_lanes, and modify the lane skipping
logic to check if a lane_id is in empty_lanes before processing. This change
avoids sparse arrays and clarifies lane tracking.

Comment on lines 116 to 136
pub async fn store_transaction_data(
&self,
transaction_data: &TransactionData,
) -> Result<(), TransactionStoreError> {
let mut conn = self.redis.clone();
let data_key = self.transaction_data_key(&transaction_data.transaction_id);
let active_key =
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id);

let data_json = serde_json::to_string(transaction_data)?;

// Store transaction data
let _: () = conn.set(&data_key, data_json).await?;

// Add to active transactions set
let _: () = conn
.sadd(&active_key, &transaction_data.transaction_id)
.await?;

Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using Redis transactions for atomic operations.

The current implementation performs multiple Redis commands sequentially, which could lead to race conditions if multiple workers operate on the same transaction simultaneously.

Consider using Redis transactions (MULTI/EXEC) or Lua scripts for atomicity:

 pub async fn store_transaction_data(
     &self,
     transaction_data: &TransactionData,
 ) -> Result<(), TransactionStoreError> {
-    let mut conn = self.redis.clone();
+    let mut conn = self.redis.clone();
     let data_key = self.transaction_data_key(&transaction_data.transaction_id);
     let active_key =
         self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id);

     let data_json = serde_json::to_string(transaction_data)?;

-    // Store transaction data
-    let _: () = conn.set(&data_key, data_json).await?;
-
-    // Add to active transactions set
-    let _: () = conn
-        .sadd(&active_key, &transaction_data.transaction_id)
-        .await?;
+    // Use pipeline for atomic operations
+    let mut pipe = twmq::redis::pipe();
+    pipe.atomic()
+        .set(&data_key, data_json)
+        .sadd(&active_key, &transaction_data.transaction_id);
+    let _: () = pipe.query_async(&mut conn).await?;

     Ok(())
 }

Alternatively, expose only the pipeline methods and make the async methods private to enforce atomic usage.

Also applies to: 256-293

🤖 Prompt for AI Agents
In executors/src/eoa/transaction_store.rs around lines 116 to 136, the current
method performs multiple Redis commands sequentially, risking race conditions.
Refactor the code to use Redis transactions by wrapping the set and sadd
commands within a MULTI/EXEC block or use a Lua script to ensure atomicity. This
change will guarantee that both operations succeed or fail together, preventing
inconsistent state. Also, consider making async methods private and exposing
only pipeline methods to enforce atomic usage. Apply similar changes to lines
256-293 as well.

let active_key =
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id);

let data_json = serde_json::to_string(transaction_data).unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace unwrap() with proper error handling in pipeline methods.

Using unwrap() on serialization can cause panics if serialization fails. While unlikely with these simple structs, it's better to handle errors gracefully in production code.

Consider propagating the error by making these methods return Result:

-    pub fn add_store_transaction_command(
+    pub fn add_store_transaction_command(
         &self,
         pipeline: &mut Pipeline,
         transaction_data: &TransactionData,
-    ) {
+    ) -> Result<(), TransactionStoreError> {
         let data_key = self.transaction_data_key(&transaction_data.transaction_id);
         let active_key =
             self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id);

-        let data_json = serde_json::to_string(transaction_data).unwrap();
+        let data_json = serde_json::to_string(transaction_data)?;

         pipeline.set(&data_key, data_json);
         pipeline.sadd(&active_key, &transaction_data.transaction_id);
+        Ok(())
     }

Apply similar changes to the other pipeline methods.

Also applies to: 385-385, 421-421

🤖 Prompt for AI Agents
In executors/src/eoa/transaction_store.rs at line 368, replace the use of
unwrap() on serde_json::to_string(transaction_data) with proper error handling
by changing the method to return a Result type and propagating the serialization
error using the ? operator. Apply the same pattern to lines 385 and 421,
modifying those methods to return Result and handle errors gracefully instead of
panicking.

Comment on lines 295 to 328
/// Mark transaction as failed and clean up
pub async fn mark_transaction_failed(
&self,
transaction_id: &str,
error_message: &str,
) -> Result<(), TransactionStoreError> {
let mut conn = self.redis.clone();

// Get transaction data to determine EOA and chain_id
let tx_data = self
.get_transaction_data(transaction_id)
.await?
.ok_or_else(|| TransactionStoreError::TransactionNotFound {
transaction_id: transaction_id.to_string(),
})?;

let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id);
let attempt_key = self.active_attempt_key(transaction_id);

// Get current attempt to clean up nonce index
if let Some(attempt) = self.get_active_attempt(transaction_id).await? {
let nonce_key =
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce);
let _: () = conn.srem(&nonce_key, transaction_id).await?;
}

// Remove from active set
let _: () = conn.srem(&active_key, transaction_id).await?;

// Remove active attempt
let _: () = conn.del(&attempt_key).await?;

Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Store the error message when marking transaction as failed.

The error_message parameter is currently unused. Consider storing it for debugging and audit purposes.

You could store the error in a dedicated key or as part of a failure record:

 pub async fn mark_transaction_failed(
     &self,
     transaction_id: &str,
     error_message: &str,
 ) -> Result<(), TransactionStoreError> {
     let mut conn = self.redis.clone();
+    let failure_key = match &self.namespace {
+        Some(ns) => format!("{}:eoa_failure:{}", ns, transaction_id),
+        None => format!("eoa_failure:{}", transaction_id),
+    };

     // Get transaction data to determine EOA and chain_id
     let tx_data = self
         .get_transaction_data(transaction_id)
         .await?
         .ok_or_else(|| TransactionStoreError::TransactionNotFound {
             transaction_id: transaction_id.to_string(),
         })?;

     let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id);
     let attempt_key = self.active_attempt_key(transaction_id);

     // Get current attempt to clean up nonce index
     if let Some(attempt) = self.get_active_attempt(transaction_id).await? {
         let nonce_key =
             self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce);
         let _: () = conn.srem(&nonce_key, transaction_id).await?;
     }

+    // Store failure reason with timestamp
+    let failure_data = serde_json::json!({
+        "error": error_message,
+        "failed_at": std::time::SystemTime::now()
+            .duration_since(std::time::UNIX_EPOCH)
+            .unwrap()
+            .as_secs(),
+    });
+    let _: () = conn.set(&failure_key, failure_data.to_string()).await?;

     // Remove from active set
     let _: () = conn.srem(&active_key, transaction_id).await?;

     // Remove active attempt
     let _: () = conn.del(&attempt_key).await?;

     Ok(())
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Mark transaction as failed and clean up
pub async fn mark_transaction_failed(
&self,
transaction_id: &str,
error_message: &str,
) -> Result<(), TransactionStoreError> {
let mut conn = self.redis.clone();
// Get transaction data to determine EOA and chain_id
let tx_data = self
.get_transaction_data(transaction_id)
.await?
.ok_or_else(|| TransactionStoreError::TransactionNotFound {
transaction_id: transaction_id.to_string(),
})?;
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id);
let attempt_key = self.active_attempt_key(transaction_id);
// Get current attempt to clean up nonce index
if let Some(attempt) = self.get_active_attempt(transaction_id).await? {
let nonce_key =
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce);
let _: () = conn.srem(&nonce_key, transaction_id).await?;
}
// Remove from active set
let _: () = conn.srem(&active_key, transaction_id).await?;
// Remove active attempt
let _: () = conn.del(&attempt_key).await?;
Ok(())
}
/// Mark transaction as failed and clean up
pub async fn mark_transaction_failed(
&self,
transaction_id: &str,
error_message: &str,
) -> Result<(), TransactionStoreError> {
let mut conn = self.redis.clone();
let failure_key = match &self.namespace {
Some(ns) => format!("{}:eoa_failure:{}", ns, transaction_id),
None => format!("eoa_failure:{}", transaction_id),
};
// Get transaction data to determine EOA and chain_id
let tx_data = self
.get_transaction_data(transaction_id)
.await?
.ok_or_else(|| TransactionStoreError::TransactionNotFound {
transaction_id: transaction_id.to_string(),
})?;
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id);
let attempt_key = self.active_attempt_key(transaction_id);
// Get current attempt to clean up nonce index
if let Some(attempt) = self.get_active_attempt(transaction_id).await? {
let nonce_key =
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce);
let _: () = conn.srem(&nonce_key, transaction_id).await?;
}
// Store failure reason with timestamp
let failure_data = serde_json::json!({
"error": error_message,
"failed_at": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
});
let _: () = conn.set(&failure_key, failure_data.to_string()).await?;
// Remove from active set
let _: () = conn.srem(&active_key, transaction_id).await?;
// Remove active attempt
let _: () = conn.del(&attempt_key).await?;
Ok(())
}
🤖 Prompt for AI Agents
In executors/src/eoa/transaction_store.rs around lines 295 to 328, the
error_message parameter in mark_transaction_failed is unused. To fix this, store
the error_message in Redis or your storage system as part of the transaction
failure record, for example by setting a dedicated key like
"transaction:{transaction_id}:error" with the error_message value. This will
help with debugging and auditing failed transactions.

Comment on lines 547 to 572
let requeue_job = self
.send_queue
.clone()
.job(EoaSendJobData {
transaction_id: tx_data.transaction_id.clone(),
chain_id: tx_data.chain_id,
from: tx_data.eoa,
to: tx_data.to,
value: tx_data.value,
data: tx_data.data.into(),
webhook_options: None, // TODO: Get from original job if needed
assigned_nonce: None, // Will get new nonce
gas_limit: tx_data.gas_limit,
signing_credential: Default::default(), // TODO: Get from original job
rpc_credentials: job_data.rpc_credentials.clone(),
})
.with_id(&format!("{}_{}", transaction_id, attempt_number));

// Queue the job (this would normally be done in a pipeline in the actual hook)
tracing::info!(
transaction_id = %transaction_id,
queue_job_id = %format!("{}_{}", transaction_id, attempt_number),
"Requeuing transaction after race loss"
);

Ok(())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Complete the requeue implementation with proper credentials and queueing.

The current implementation has several critical issues that will prevent requeued transactions from working:

  1. Missing signing credentials (line 560)
  2. Missing webhook options (line 557)
  3. Job is created but never actually queued (line 565 comment)
  4. Direct Redis access breaks encapsulation (lines 542-544)

The signing credentials are essential for transaction execution. You need to either:

  • Store credentials with the original transaction data
  • Pass them through the confirmation worker job data
  • Retrieve them from a secure credential store

Here's a suggested approach:

+    // Store signing credentials reference with transaction data
+    #[derive(Debug, Clone, Serialize, Deserialize)]
+    pub struct TransactionData {
+        pub transaction_id: String,
+        pub eoa: Address,
+        pub chain_id: u64,
+        pub to: Option<Address>,
+        pub value: U256,
+        pub data: Vec<u8>,
+        pub gas_limit: Option<u64>,
+        pub created_at: u64,
+        pub signing_credential_id: String,  // Reference to credential
+        pub webhook_options: Option<WebhookOptions>,
+    }

     async fn requeue_transaction(
         &self,
         job_data: &EoaConfirmationWorkerJobData,
         transaction_id: &str,
     ) -> Result<(), EoaConfirmationWorkerError> {
         // ... existing code ...

-        // Get current attempt number for new queue job ID
-        let mut conn = self.transaction_store.redis.clone();
-        let counter_key = self.transaction_store.attempt_counter_key(transaction_id);
-        let attempt_number: u32 = conn.get(&counter_key).await.unwrap_or(0);
+        // Use transaction store method to get attempt counter
+        let attempt_number = self.transaction_store
+            .get_attempt_counter(transaction_id)
+            .await
+            .unwrap_or(0);

         // Create new send job with incremented attempt
         let requeue_job = self
             .send_queue
             .clone()
             .job(EoaSendJobData {
                 transaction_id: tx_data.transaction_id.clone(),
                 chain_id: tx_data.chain_id,
                 from: tx_data.eoa,
                 to: tx_data.to,
                 value: tx_data.value,
                 data: tx_data.data.into(),
-                webhook_options: None, // TODO: Get from original job if needed
+                webhook_options: tx_data.webhook_options,
                 assigned_nonce: None,  // Will get new nonce
                 gas_limit: tx_data.gas_limit,
-                signing_credential: Default::default(), // TODO: Get from original job
+                signing_credential: CredentialReference::from_id(tx_data.signing_credential_id),
                 rpc_credentials: job_data.rpc_credentials.clone(),
             })
             .with_id(&format!("{}_{}", transaction_id, attempt_number));

-        // Queue the job (this would normally be done in a pipeline in the actual hook)
+        // Actually queue the job
+        requeue_job.push().await?;

         tracing::info!(
             transaction_id = %transaction_id,
             queue_job_id = %format!("{}_{}", transaction_id, attempt_number),
             "Requeuing transaction after race loss"
         );

         Ok(())
     }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In executors/src/eoa/eoa_confirmation_worker.rs around lines 547 to 572, the
requeue job creation is incomplete: signing credentials and webhook options are
missing, and the job is never actually queued. Fix this by retrieving and
including the original job's signing credentials and webhook options when
constructing EoaSendJobData, then enqueue the job properly using the
send_queue's enqueue or similar method instead of direct Redis access to
maintain encapsulation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
executors/src/eoa/worker.rs (2)

806-819: Missing implementation for no-op transaction sending.

The send_noop_transaction method is not implemented, which could block the transaction queue when recycled nonces need to be consumed but no pending transactions exist.

Would you like me to implement the no-op transaction logic or create an issue to track this TODO? A no-op transaction typically sends 0 ETH to the sender's own address with minimal gas.


511-514: Hard-coded delays should be configurable.

The 50ms delays between consecutive transactions are hard-coded. Different RPC providers have different rate limits, so this should be configurable.

Consider making the delay configurable:

 pub struct EoaExecutorWorker<CS>
 where
     CS: ChainService + Send + Sync + 'static,
 {
     pub chain_service: Arc<CS>,
     pub store: Arc<EoaExecutorStore>,
     pub eoa_signer: Arc<EoaSigner>,
     pub max_inflight: u64,
     pub max_recycled_nonces: u64,
+    pub nonce_send_delay_ms: u64, // Default: 50
 }

Then use it in the delay:

-sleep(Duration::from_millis(50)).await;
+sleep(Duration::from_millis(self.nonce_send_delay_ms)).await;

Also applies to: 628-630

executors/src/eoa/store.rs (1)

449-453: Unnecessary cloning of keys for closures.

The code clones multiple keys for use in the closures, which is inefficient and reduces readability.

Since the helper method already handles the closure execution, you can capture the keys by reference:

-let recyled_key_for_validation = recycled_key.clone();
-let pending_key_for_validation = pending_key.clone();
-let transaction_id_for_validation = transaction_id.clone();
-let borrowed_key_for_validation = borrowed_key.clone();

 self.execute_with_watch_and_retry(
     eoa,
     chain_id,
     worker_id,
     &[recycled_key.clone(), pending_key.clone()],
     "pending->borrowed with recycled nonce",
-    async move |conn: &mut ConnectionManager| {
+    async |conn: &mut ConnectionManager| {
         // Validation phase - check preconditions
-        let nonce_score: Option<f64> = conn.zscore(recycled_key.clone(), nonce).await?;
+        let nonce_score: Option<f64> = conn.zscore(&recycled_key, nonce).await?;
         // ... rest of validation
     },
-    |pipeline: &mut Pipeline| {
-        pipeline.zrem(recyled_key_for_validation, nonce);
+    move |pipeline: &mut Pipeline| {
+        pipeline.zrem(&recycled_key, nonce);
         // ... rest of pipeline
     },
 )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 623b545 and 87affb4.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • executors/Cargo.toml (1 hunks)
  • executors/src/eoa/mod.rs (1 hunks)
  • executors/src/eoa/store.rs (1 hunks)
  • executors/src/eoa/worker.rs (1 hunks)
  • twmq/src/job.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • executors/Cargo.toml
  • executors/src/eoa/mod.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
executors/src/eoa/worker.rs (3)
executors/src/eoa/store.rs (5)
  • eoa (1766-1768)
  • chain_id (1771-1773)
  • build (1470-1495)
  • nonce (231-236)
  • new (95-97)
twmq/src/job.rs (3)
  • data (168-170)
  • new (159-161)
  • new (200-206)
executors/src/eoa/error_classifier.rs (1)
  • message (224-236)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: test
  • GitHub Check: coverage
🔇 Additional comments (4)
twmq/src/job.rs (1)

59-74: Good trait generalization for error type flexibility!

The addition of the ErrorType: Into<E> bound allows the trait to work with any error type that can be converted into the target error type E. This is a backward-compatible change that improves the trait's flexibility without breaking existing usage.

executors/src/eoa/worker.rs (1)

922-950: Well-structured gas estimation with proper error handling.

The gas estimation logic correctly:

  • Adds a 10% buffer to prevent out-of-gas errors
  • Distinguishes between transaction reverts (which should fail the transaction) and RPC errors (which should retry the worker)
  • Provides detailed error messages including revert data
executors/src/eoa/store.rs (2)

363-364: Well-implemented exponential backoff for retry logic.

The retry mechanism with exponential backoff (capped at 640ms) and 10 max retries provides good balance between responsiveness and avoiding overwhelming Redis during high contention.

Also applies to: 1441-1442


1458-1495: Excellent scoped store pattern for API ergonomics and safety.

The ScopedEoaExecutorStore wrapper:

  • Validates lock ownership on construction, preventing unauthorized operations
  • Significantly reduces boilerplate by binding common parameters
  • Provides a cleaner, more ergonomic API

This is a great pattern for ensuring safety while improving developer experience.

Comment on lines +102 to +125
fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult {
let error_str = error.to_string().to_lowercase();

// Transaction possibly made it to mempool
if error_str.contains("nonce too low")
|| error_str.contains("already known")
|| error_str.contains("replacement transaction underpriced")
{
return SendResult::PossiblySent;
}

// Clear failures that didn't consume nonce
if error_str.contains("invalid signature")
|| error_str.contains("malformed")
|| error_str.contains("insufficient funds")
|| error_str.contains("gas limit")
|| error_str.contains("intrinsic gas too low")
{
return SendResult::DeterministicFailure;
}

// Default: assume possibly sent for safety
SendResult::PossiblySent
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Error classification relies on fragile string matching.

The classify_send_error function uses string matching on error messages, which is unreliable because:

  • Error messages can vary between RPC providers
  • Message formats may change over time
  • Localized error messages won't match

This could lead to misclassification and incorrect transaction handling (e.g., treating a sent transaction as failed or vice versa).

Consider using structured error types or error codes from the RPC response instead:

-fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult {
-    let error_str = error.to_string().to_lowercase();
-
-    // Transaction possibly made it to mempool
-    if error_str.contains("nonce too low")
-        || error_str.contains("already known")
-        || error_str.contains("replacement transaction underpriced")
-    {
-        return SendResult::PossiblySent;
-    }
-    // ... rest of the function
+fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult {
+    // Use the error classifier from error_classifier.rs which handles structured errors
+    use crate::eoa::error_classifier::EoaErrorMapper;
+    
+    match EoaErrorMapper::classify_error(error) {
+        Some(EoaExecutionError::NonceTooLow { .. }) 
+        | Some(EoaExecutionError::AlreadyKnown { .. })
+        | Some(EoaExecutionError::ReplacementUnderpriced { .. }) => SendResult::PossiblySent,
+        
+        Some(EoaExecutionError::InsufficientFunds { .. })
+        | Some(EoaExecutionError::GasError { .. }) => SendResult::DeterministicFailure,
+        
+        _ => SendResult::PossiblySent // Default to safe assumption
+    }
+}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In executors/src/eoa/worker.rs between lines 102 and 125, the
classify_send_error function currently relies on fragile string matching of
error messages, which is unreliable due to variations in RPC provider messages
and localization. To fix this, refactor the function to parse and use structured
error types or standardized error codes from the RPC response instead of string
matching. This may involve updating the RpcError handling to extract error codes
or enums and using those for classification to ensure more robust and accurate
error handling.

Comment on lines +172 to +175
self.store
.acquire_eoa_lock_aggressively(data.eoa_address, data.chain_id, &data.worker_id)
.await
.map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?;
Copy link

@coderabbitai coderabbitai bot Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Potential race condition with aggressive lock acquisition.

The acquire_eoa_lock_aggressively method forcefully takes over locks without coordination. If multiple workers simultaneously attempt to take over the same EOA lock, it could lead to race conditions where workers overwrite each other's lock ownership.

Consider implementing a more coordinated lock acquisition strategy:

  1. Add a grace period before forceful takeover
  2. Use atomic compare-and-swap operations
  3. Implement lock heartbeats to detect truly stalled workers

🏁 Script executed:

#!/bin/bash
# Check if there are any lock coordination mechanisms in the store implementation
ast-grep --pattern 'fn acquire_eoa_lock_aggressively($$$) {
  $$$
}'

Length of output: 70


🏁 Script executed:

#!/bin/bash
# Locate both uses and the definition of acquire_eoa_lock_aggressively
rg --color=never -n -C 5 'acquire_eoa_lock_aggressively' || echo "No matches found"

Length of output: 1544


🏁 Script executed:

#!/bin/bash
# Display the implementation of acquire_eoa_lock_aggressively to check for coordination logic
sed -n '260,360p' executors/src/eoa/store.rs

Length of output: 3713


Safer EOA Lock Acquisition Needed

We’ve verified that in executors/src/eoa/store.rs (lines 269–287) the acquire_eoa_lock_aggressively method does:

  • SETNX to grab the lock
  • Fallback to plain SET (no expiry) to force takeover

This leaves no TTL or coordination, so concurrent callers can override each other and stale locks never expire.

Consider strengthening the takeover logic by:

  • Adding an expiry on initial lock (e.g. Redis SET key value NX PX <ttl>)
  • Renewing the TTL via heartbeats instead of unconditional force-set
  • Employing an atomic compare-and-swap or Redis WATCH/MULTI pipeline (or Redlock)
  • Introducing a grace/backoff period before force takeover

Pinpoint locations:

  • executors/src/eoa/store.rs:269–287 (acquire_eoa_lock_aggressively)
  • executors/src/eoa/worker.rs:172–175 (usage site)
🤖 Prompt for AI Agents
In executors/src/eoa/worker.rs around lines 172 to 175, the current usage of
acquire_eoa_lock_aggressively calls a method that uses SETNX and a fallback
plain SET without expiry, risking stale locks and unsafe concurrent overrides.
To fix this, update the acquire_eoa_lock_aggressively method in
executors/src/eoa/store.rs (lines 269–287) to set the lock with an expiry (e.g.,
using Redis SET with NX and PX options), implement a heartbeat mechanism to
renew the TTL instead of force-setting the lock, and use atomic operations like
compare-and-swap or Redis WATCH/MULTI to safely coordinate lock takeover,
possibly adding a grace or backoff period before forcing a takeover. Adjust the
worker.rs usage if needed to handle the updated locking behavior properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you considered that every redis state mutation runs with a watch on this key to ensure that we have the lock, and we exit if we lose the lock?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

You're absolutely right! Let me examine the store implementation to see this coordination mechanism.

@d4mr Thank you for that clarification! You're absolutely correct. Looking at the store implementation, I can see that there's a robust lock coordination mechanism in place:

  1. WATCH-based transactions: Every Redis state mutation uses execute_with_watch_and_retry which employs Redis WATCH/MULTI/EXEC operations
  2. Lock validation: The with_lock_check method validates lock ownership before every operation
  3. Automatic retry: If the lock is lost during a transaction (detected via WATCH failure), the operation is retried with exponential backoff
  4. Graceful exit: If lock ownership can't be validated, the worker exits cleanly

This design ensures that even if multiple workers attempt aggressive lock acquisition, only one can successfully perform state mutations at a time. The WATCH mechanism provides the coordination I was concerned about.


🏁 Script executed:

#!/bin/bash
# Search for WATCH operations in the store implementation
rg -A 10 -B 2 "WATCH" executors/src/eoa/store.rs

# Also look for lock validation patterns
rg -A 5 -B 2 "with_lock_check\|execute_with_watch_and_retry" executors/src/eoa/store.rs

Length of output: 11719


@d4mr You're absolutely correct! The store implementation shows extensive WATCH-based coordination:

Lock Protection Mechanisms Found:

  1. Universal WATCH operations: Every state mutation watches the lock key:

    let _: () = twmq::redis::cmd("WATCH")
        .arg(&lock_key)
        .query_async(&mut conn).await?;
  2. Lock ownership validation: Before every operation:

    let current_owner: Option<String> = conn.get(&lock_key).await?;
    if current_owner.as_deref() != Some(worker_id) {
        return Err(TransactionStoreError::LockLost { ... });
    }
  3. WATCH failure handling: When transactions fail due to WATCH, the code distinguishes between lock loss vs. state changes:

    let still_own_lock: Option<String> = conn.get(&lock_key).await?;
    if still_own_lock.as_deref() != Some(worker_id) {
        return Err(TransactionStoreError::LockLost { ... });
    }
    // Otherwise retry
  4. Multiple coordination helpers:

    • execute_with_watch_and_retry - centralized WATCH/retry logic
    • with_atomic_operation - generic atomic operations
    • with_lock_check - lock validation wrapper

This design ensures that even with aggressive lock acquisition, only the lock holder can successfully mutate state. The WATCH mechanism provides perfect coordination - my initial concern about race conditions was unfounded given this robust protection layer.


✏️ Learnings added
Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: In the EOA executor system, aggressive lock acquisition is safe because every Redis state mutation uses WATCH operations on the lock key. If the lock is lost during a transaction, the WATCH causes the transaction to fail and the worker exits gracefully. This provides coordination between workers even when using forceful lock takeover.

Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: The EOA executor store uses comprehensive WATCH-based coordination where every Redis state mutation watches the lock key and validates ownership before proceeding. If the lock is lost during any operation, the transaction fails with LockLost error. This makes aggressive lock acquisition safe because only the actual lock holder can successfully perform state mutations, regardless of who claims the lock.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
executors/src/eoa/worker.rs (2)

103-126: Replace fragile string matching with structured error classification.

The current implementation still uses string matching on error messages, which remains fragile as noted in previous reviews. Error messages can vary between RPC providers, change over time, and may be localized.

Use the error classifier from the eoa module instead:

+use crate::eoa::error_classifier::EoaErrorMapper;

 fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult {
-    let error_str = error.to_string().to_lowercase();
-
-    // Transaction possibly made it to mempool
-    if error_str.contains("nonce too low")
-        || error_str.contains("already known")
-        || error_str.contains("replacement transaction underpriced")
-    {
-        return SendResult::PossiblySent;
-    }
-
-    // Clear failures that didn't consume nonce
-    if error_str.contains("invalid signature")
-        || error_str.contains("malformed")
-        || error_str.contains("insufficient funds")
-        || error_str.contains("gas limit")
-        || error_str.contains("intrinsic gas too low")
-    {
-        return SendResult::DeterministicFailure;
-    }
-
-    // Default: assume possibly sent for safety
-    SendResult::PossiblySent
+    match EoaErrorMapper::classify_error(error) {
+        Some(err) if err.is_possibly_sent() => SendResult::PossiblySent,
+        Some(err) if err.is_deterministic_failure() => SendResult::DeterministicFailure,
+        _ => SendResult::PossiblySent // Default to safe assumption
+    }
 }

172-176: Implement safer lock acquisition with coordination.

The aggressive lock acquisition remains problematic as noted in previous reviews. Multiple workers can simultaneously force takeover, leading to race conditions and lost work.

Consider implementing these improvements in the store's acquire_eoa_lock_aggressively method:

  1. Add TTL to locks (e.g., SET key value NX PX <ttl>)
  2. Implement heartbeat mechanism for lock renewal
  3. Use atomic compare-and-swap or WATCH/MULTI for safe takeover
  4. Add grace period before forcing takeover
 // 1. ACQUIRE LOCK AGGRESSIVELY
 tracing::info!("Acquiring EOA lock aggressively");
-self.store
-    .acquire_eoa_lock_aggressively(data.eoa_address, data.chain_id, &data.worker_id)
-    .await
-    .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?;
+match self.store
+    .try_acquire_eoa_lock_with_timeout(
+        data.eoa_address, 
+        data.chain_id, 
+        &data.worker_id,
+        Duration::from_secs(300) // 5 minute TTL
+    )
+    .await {
+    Ok(()) => {},
+    Err(TransactionStoreError::LockContended { .. }) => {
+        // Wait and retry with backoff
+        return Err(EoaExecutorWorkerError::WorkRemaining { 
+            message: "Lock contended, will retry".to_string() 
+        }).map_err_nack(Some(Duration::from_secs(30)), RequeuePosition::Last);
+    }
+    Err(e) => return Err(e).map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last),
+}
🧹 Nitpick comments (4)
executors/src/eoa/store.rs (2)

539-632: Consider extracting lock validation logic to improve readability.

The helper method is well-designed and effectively centralizes atomic operation logic. However, it's quite complex. Consider extracting the lock validation logic into a separate method to improve readability.

+    async fn validate_lock_ownership(
+        &self,
+        conn: &mut ConnectionManager,
+        lock_key: &str,
+        worker_id: &str,
+        eoa: Address,
+        chain_id: u64,
+    ) -> Result<(), TransactionStoreError> {
+        let current_owner: Option<String> = conn.get(lock_key).await?;
+        if current_owner.as_deref() != Some(worker_id) {
+            return Err(TransactionStoreError::LockLost {
+                eoa,
+                chain_id,
+                worker_id: worker_id.to_string(),
+            });
+        }
+        Ok(())
+    }

     async fn execute_with_watch_and_retry(
         &self,
         eoa: Address,
         chain_id: u64,
         worker_id: &str,
         safe_tx: &impl SafeRedisTransaction,
     ) -> Result<(), TransactionStoreError> {
         // ... existing code ...

-            // Check lock ownership
-            let current_owner: Option<String> = conn.get(&lock_key).await?;
-            if current_owner.as_deref() != Some(worker_id) {
-                let _: () = twmq::redis::cmd("UNWATCH").query_async(&mut conn).await?;
-                return Err(TransactionStoreError::LockLost {
-                    eoa,
-                    chain_id,
-                    worker_id: worker_id.to_string(),
-                });
-            }
+            // Check lock ownership
+            if let Err(e) = self.validate_lock_ownership(&mut conn, &lock_key, worker_id, eoa, chain_id).await {
+                let _: () = twmq::redis::cmd("UNWATCH").query_async(&mut conn).await?;
+                return Err(e);
+            }

1002-1102: Consider extracting hash validation logic for better maintainability.

This method is necessarily complex due to the need to pass hash data between validation and pipeline phases. While the current implementation is correct, consider extracting the hash validation logic to improve readability.

+    async fn find_transaction_hashes(
+        &self,
+        conn: &mut ConnectionManager,
+        submitted_key: &str,
+        transaction_id: &str,
+    ) -> Result<Vec<String>, TransactionStoreError> {
+        let all_hashes: Vec<String> = conn.zrange(submitted_key, 0, -1).await?;
+        let mut transaction_hashes = Vec::new();
+
+        for hash in all_hashes {
+            if let Some(tx_id) = self.get_transaction_id_for_hash(&hash).await? {
+                if tx_id == transaction_id {
+                    transaction_hashes.push(hash);
+                }
+            }
+        }
+
+        if transaction_hashes.is_empty() {
+            return Err(TransactionStoreError::TransactionNotInSubmittedState {
+                transaction_id: transaction_id.to_string(),
+            });
+        }
+
+        Ok(transaction_hashes)
+    }
executors/src/eoa/worker.rs (2)

591-645: Good parallel-to-sequential pattern with comprehensive error handling.

The implementation correctly handles the complex flow of building transactions in parallel and then moving them to borrowed state sequentially to maintain nonce ordering. The error handling for nonce conflicts and missing transactions is thorough.

Consider extracting the error handling logic into a helper method to reduce nesting:

+    fn handle_move_to_borrowed_error(
+        &self,
+        error: TransactionStoreError,
+        transaction_id: &str,
+    ) -> Result<bool, ()> {
+        match error {
+            TransactionStoreError::OptimisticNonceChanged { .. } => {
+                tracing::debug!("Nonce changed for transaction {}, skipping", transaction_id);
+                Ok(false) // Stop processing
+            }
+            TransactionStoreError::TransactionNotInPendingQueue { .. } => {
+                tracing::debug!("Transaction {} already processed, skipping", transaction_id);
+                Err(()) // Continue with next
+            }
+            e => {
+                tracing::error!("Failed to move transaction {} to borrowed: {}", transaction_id, e);
+                Err(()) // Continue with next
+            }
+        }
+    }

828-842: Complete no-op transaction implementation for recycled nonces.

The missing no-op transaction implementation could cause recycled nonces to remain unconsumed, potentially leading to nonce gaps or inefficient recycling.

This missing functionality affects the recycling system. The implementation needs to:

  1. Create a minimal transaction (e.g., self-transfer of 0 value)
  2. Handle signing credentials appropriately
  3. Implement atomic nonce consumption from recycled set

Would you like me to generate a complete implementation for this method?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 87affb4 and f3d8425.

📒 Files selected for processing (2)
  • executors/src/eoa/store.rs (1 hunks)
  • executors/src/eoa/worker.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: coverage
  • GitHub Check: test
🔇 Additional comments (3)
executors/src/eoa/store.rs (2)

14-22: Well-designed trait for atomic Redis operations.

The SafeRedisTransaction trait provides an excellent abstraction for atomic Redis operations, properly separating validation, operation building, and watch key management. The async trait syntax is correct for stable Rust.


1565-1602: Excellent ergonomic wrapper with proper validation.

The ScopedEoaExecutorStore is a well-designed wrapper that eliminates repetitive parameter passing while ensuring lock ownership validation. The builder pattern with validation is a good approach.

executors/src/eoa/worker.rs (1)

286-373: Well-implemented crash recovery with proper stale transaction handling.

The crash recovery logic is comprehensive and handles edge cases well:

  • Proper sorting by nonce for ordering
  • Detection and logging of stale transactions
  • Appropriate error classification for recovery decisions
  • Good error handling and logging

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant