Skip to content

Commit 83e430e

Browse files
farhanW3d4mr
andauthored
Updates: v2 Nonce (#597)
* Updates * updates * cleaned logs * updates * updated recycleNonce to check if isNan * comments updated * chore: expose NonceResyncQueue to bullboard * Convert nonce-recycled to set and add nonce-sent set helpers * fix: handle removing from sent nonces in fail handler * chore: Remove commented out code in setup.ts file * Update nonceResyncWorker to use stricter wildcard pattern for redis keys * Add isSentNonce helper function for checking if a nonce is in the sent nonces set * refactor and solve nonce resynchronization logic bugs and nits in nonceResyncWorker * chore: Refactor cancelRecycledNoncesWorker to use SMEMBERS instead of LRANGE for retrieving unused nonces * treat `ReplacementGasFeeTowLow` as `NonceAlreadyUsed` * updated logs & removeSentNonce --------- Co-authored-by: Prithvish Baidya <deformercoding@gmail.com>
1 parent 0288dd6 commit 83e430e

File tree

9 files changed

+256
-11
lines changed

9 files changed

+256
-11
lines changed

src/db/wallets/walletNonce.ts

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Address, eth_getTransactionCount, getRpcClient } from "thirdweb";
22
import { getChain } from "../../utils/chain";
3+
import { logger } from "../../utils/logger";
34
import { normalizeAddress } from "../../utils/primitiveTypes";
45
import { redis } from "../../utils/redis/redis";
56
import { thirdwebClient } from "../../utils/sdk";
@@ -12,12 +13,68 @@ export const lastUsedNonceKey = (chainId: number, walletAddress: Address) =>
1213
`nonce:${chainId}:${normalizeAddress(walletAddress)}`;
1314

1415
/**
15-
* The "recycled nonces" list stores unsorted nonces to be reused or cancelled.
16+
* The "recycled nonces" set stores unsorted nonces to be reused or cancelled.
1617
* Example: [ "25", "23", "24" ]
1718
*/
18-
const recycledNoncesKey = (chainId: number, walletAddress: Address) =>
19+
export const recycledNoncesKey = (chainId: number, walletAddress: Address) =>
1920
`nonce-recycled:${chainId}:${normalizeAddress(walletAddress)}`;
2021

22+
/**
23+
* The "sent nonces" set stores nonces that have been sent on chain but not yet mined.
24+
*
25+
* Example: [ "25", "23", "24" ]
26+
*
27+
* The `nonceResyncWorker` periodically fetches the onchain transaction count for each wallet (a),
28+
* compares it to the last nonce sent (b), and for every nonce between b and a,
29+
* it recycles the nonce if the nonce is not in this set.
30+
*/
31+
export const sentNoncesKey = (chainId: number, walletAddress: Address) =>
32+
`nonce-sent:${chainId}:${normalizeAddress(walletAddress)}`;
33+
34+
export const splitSentNoncesKey = (key: string) => {
35+
const _splittedKeys = key.split(":");
36+
const walletAddress = normalizeAddress(_splittedKeys[2]);
37+
const chainId = parseInt(_splittedKeys[1]);
38+
return { walletAddress, chainId };
39+
};
40+
41+
/**
42+
* Adds a nonce to the sent nonces set (`nonce-sent:${chainId}:${walletAddress}`).
43+
*/
44+
export const addSentNonce = async (
45+
chainId: number,
46+
walletAddress: Address,
47+
nonce: number,
48+
) => {
49+
const key = sentNoncesKey(chainId, walletAddress);
50+
await redis.sadd(key, nonce.toString());
51+
};
52+
53+
/**
54+
* Removes a nonce from the sent nonces set (`nonce-sent:${chainId}:${walletAddress}`).
55+
*/
56+
export const removeSentNonce = async (
57+
chainId: number,
58+
walletAddress: Address,
59+
nonce: number,
60+
) => {
61+
const key = sentNoncesKey(chainId, walletAddress);
62+
const removed = await redis.srem(key, nonce.toString());
63+
return removed === 1;
64+
};
65+
66+
/**
67+
* Check if a nonce is in the sent nonces set.
68+
*/
69+
export const isSentNonce = async (
70+
chainId: number,
71+
walletAddress: Address,
72+
nonce: number,
73+
) => {
74+
const key = sentNoncesKey(chainId, walletAddress);
75+
return !!(await redis.sismember(key, nonce.toString()));
76+
};
77+
2178
/**
2279
* Acquire an unused nonce.
2380
* This should be used to send an EOA transaction with this nonce.
@@ -58,8 +115,16 @@ export const recycleNonce = async (
58115
walletAddress: Address,
59116
nonce: number,
60117
) => {
118+
if (isNaN(nonce)) {
119+
logger({
120+
level: "warn",
121+
message: `[recycleNonce] Invalid nonce: ${nonce}`,
122+
service: "worker",
123+
});
124+
return;
125+
}
61126
const key = recycledNoncesKey(chainId, walletAddress);
62-
await redis.rpush(key, nonce);
127+
await redis.sadd(key, nonce.toString());
63128
};
64129

65130
/**
@@ -73,7 +138,7 @@ const _acquireRecycledNonce = async (
73138
walletAddress: Address,
74139
) => {
75140
const key = recycledNoncesKey(chainId, walletAddress);
76-
const res = await redis.lpop(key);
141+
const res = await redis.spop(key);
77142
return res ? parseInt(res) : null;
78143
};
79144

@@ -139,7 +204,7 @@ export const rebaseNonce = async (chainId: number, walletAddress: Address) => {
139204
address: walletAddress,
140205
});
141206

142-
// Lua script to set nonce as max(transactionCount, redis.get(lastUsedNonceKey))
207+
// Lua script to set nonce as max
143208
const script = `
144209
local transactionCount = tonumber(ARGV[1])
145210
local lastUsedNonce = tonumber(redis.call('get', KEYS[1]))

src/server/middleware/adminRoutes.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { env } from "../../utils/env";
1010
import { CancelRecycledNoncesQueue } from "../../worker/queues/cancelRecycledNoncesQueue";
1111
import { MigratePostgresTransactionsQueue } from "../../worker/queues/migratePostgresTransactionsQueue";
1212
import { MineTransactionQueue } from "../../worker/queues/mineTransactionQueue";
13+
import { NonceResyncQueue } from "../../worker/queues/nonceResyncQueue";
1314
import { ProcessEventsLogQueue } from "../../worker/queues/processEventLogsQueue";
1415
import { ProcessTransactionReceiptsQueue } from "../../worker/queues/processTransactionReceiptsQueue";
1516
import { PruneTransactionsQueue } from "../../worker/queues/pruneTransactionsQueue";
@@ -28,6 +29,7 @@ const QUEUES: Queue[] = [
2829
CancelRecycledNoncesQueue.q,
2930
PruneTransactionsQueue.q,
3031
MigratePostgresTransactionsQueue.q,
32+
NonceResyncQueue.q,
3133
];
3234

3335
export const withAdminRoutes = async (fastify: FastifyInstance) => {

src/worker/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { initCancelRecycledNoncesWorker } from "./tasks/cancelRecycledNoncesWorker";
1111
import { initMigratePostgresTransactionsWorker } from "./tasks/migratePostgresTransactionsWorker";
1212
import { initMineTransactionWorker } from "./tasks/mineTransactionWorker";
13+
import { initNonceResyncWorker } from "./tasks/nonceResyncWorker";
1314
import { initProcessEventLogsWorker } from "./tasks/processEventLogsWorker";
1415
import { initProcessTransactionReceiptsWorker } from "./tasks/processTransactionReceiptsWorker";
1516
import { initPruneTransactionsWorker } from "./tasks/pruneTransactionsWorker";
@@ -26,6 +27,8 @@ export const initWorker = async () => {
2627
initSendWebhookWorker();
2728
await initMigratePostgresTransactionsWorker();
2829

30+
await initNonceResyncWorker();
31+
2932
// Listen for new & updated configuration data.
3033
await newConfigurationListener();
3134
await updatedConfigurationListener();

src/worker/queues/nonceResyncQueue.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { Queue } from "bullmq";
2+
import { redis } from "../../utils/redis/redis";
3+
import { defaultJobOptions } from "./queues";
4+
5+
export class NonceResyncQueue {
6+
static q = new Queue<string>("nonce-resync-cron", {
7+
connection: redis,
8+
defaultJobOptions,
9+
});
10+
11+
constructor() {
12+
NonceResyncQueue.q.setGlobalConcurrency(1);
13+
14+
// The cron job is defined in `initNonceResyncWorker`
15+
// because it requires an async call to query configuration.
16+
}
17+
}

src/worker/tasks/cancelRecycledNoncesWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ const getAndDeleteUnusedNonces = async (key: string) => {
7777
// Returns all unused nonces for this key and deletes the key.
7878
const script = `
7979
local key = ARGV[1]
80-
local members = redis.call('LRANGE', key, 0, -1)
80+
local members = redis.call('SMEMBERS', key)
8181
redis.call('DEL', key)
8282
return members
8383
`;

src/worker/tasks/mineTransactionWorker.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
import { stringify } from "thirdweb/utils";
1111
import { getUserOpReceiptRaw } from "thirdweb/wallets/smart";
1212
import { TransactionDB } from "../../db/transactions/db";
13-
import { recycleNonce } from "../../db/wallets/walletNonce";
13+
import { recycleNonce, removeSentNonce } from "../../db/wallets/walletNonce";
1414
import { getBlockNumberish } from "../../utils/block";
1515
import { getConfig } from "../../utils/cache/getConfig";
1616
import { getChain } from "../../utils/chain";
@@ -128,6 +128,19 @@ const _mineTransaction = async (
128128
if (result.status === "fulfilled") {
129129
const receipt = result.value;
130130
job.log(`Found receipt on block ${receipt.blockNumber}.`);
131+
132+
const removed = await removeSentNonce(
133+
sentTransaction.chainId,
134+
sentTransaction.from,
135+
sentTransaction.nonce,
136+
);
137+
138+
logger({
139+
level: "debug",
140+
message: `[mineTransactionWorker] Removed nonce ${sentTransaction.nonce} from nonce-sent set: ${removed}`,
141+
service: "worker",
142+
});
143+
131144
return {
132145
...sentTransaction,
133146
status: "mined",
@@ -142,7 +155,6 @@ const _mineTransaction = async (
142155
};
143156
}
144157
}
145-
146158
// Else the transaction is not mined yet.
147159

148160
// Retry the transaction (after some initial delay).
@@ -180,6 +192,7 @@ const _mineUserOp = async (
180192
chain,
181193
userOpHash,
182194
});
195+
183196
if (!userOpReceiptRaw) {
184197
return null;
185198
}
@@ -243,12 +256,20 @@ export const initMineTransactionWorker = () => {
243256

244257
if (!sentTransaction.isUserOp) {
245258
// Release the nonce to allow it to be reused or cancelled.
246-
job.log(`Recycling nonce: ${sentTransaction.nonce}`);
259+
job.log(
260+
`Recycling nonce and removing from nonce-sent: ${sentTransaction.nonce}`,
261+
);
247262
await recycleNonce(
248263
sentTransaction.chainId,
249264
sentTransaction.from,
250265
sentTransaction.nonce,
251266
);
267+
268+
await removeSentNonce(
269+
sentTransaction.chainId,
270+
sentTransaction.from,
271+
sentTransaction.nonce,
272+
);
252273
}
253274
}
254275
});

src/worker/tasks/nonceResyncWorker.ts

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import { Job, Processor, Worker } from "bullmq";
2+
import { eth_getTransactionCount, getRpcClient } from "thirdweb";
3+
import {
4+
inspectNonce,
5+
isSentNonce,
6+
recycleNonce,
7+
splitSentNoncesKey,
8+
} from "../../db/wallets/walletNonce";
9+
import { getConfig } from "../../utils/cache/getConfig";
10+
import { getChain } from "../../utils/chain";
11+
import { logger } from "../../utils/logger";
12+
import { redis } from "../../utils/redis/redis";
13+
import { thirdwebClient } from "../../utils/sdk";
14+
import { NonceResyncQueue } from "../queues/nonceResyncQueue";
15+
import { logWorkerExceptions } from "../queues/queues";
16+
17+
// Must be explicitly called for the worker to run on this host.
18+
export const initNonceResyncWorker = async () => {
19+
const config = await getConfig();
20+
if (config.minedTxListenerCronSchedule) {
21+
NonceResyncQueue.q.add("cron", "", {
22+
repeat: { pattern: config.minedTxListenerCronSchedule },
23+
jobId: "nonce-resync-cron",
24+
});
25+
}
26+
27+
const _worker = new Worker(NonceResyncQueue.q.name, handler, {
28+
connection: redis,
29+
concurrency: 1,
30+
});
31+
logWorkerExceptions(_worker);
32+
};
33+
34+
/**
35+
* Resyncs nonces for all wallets.
36+
* This worker should be run periodically to ensure that nonces are not skipped.
37+
* It checks the onchain nonce for each wallet and recycles any missing nonces.
38+
*
39+
* This is to unblock a wallet that has been stuck due to one or more skipped nonces.
40+
*/
41+
const handler: Processor<any, void, string> = async (job: Job<string>) => {
42+
const sentNoncesKeys = await redis.keys("nonce-sent*");
43+
job.log(`Found ${sentNoncesKeys.length} nonce-sent* keys`);
44+
45+
for (const sentNonceKey of sentNoncesKeys) {
46+
const { chainId, walletAddress } = splitSentNoncesKey(sentNonceKey);
47+
48+
const rpcRequest = getRpcClient({
49+
client: thirdwebClient,
50+
chain: await getChain(chainId),
51+
});
52+
53+
const [transactionCount, lastUsedNonceDb] = await Promise.all([
54+
eth_getTransactionCount(rpcRequest, {
55+
address: walletAddress,
56+
}),
57+
inspectNonce(chainId, walletAddress),
58+
]);
59+
60+
if (isNaN(transactionCount)) {
61+
job.log(
62+
`Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`,
63+
);
64+
65+
logger({
66+
level: "error",
67+
message: `[nonceResyncWorker] Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`,
68+
service: "worker",
69+
});
70+
71+
return;
72+
}
73+
74+
const lastUsedNonceOnchain = transactionCount - 1;
75+
76+
job.log(
77+
`${walletAddress} last used onchain nonce: ${lastUsedNonceOnchain} and last used db nonce: ${lastUsedNonceDb}`,
78+
);
79+
logger({
80+
level: "debug",
81+
message: `[nonceResyncWorker] last used onchain nonce: ${transactionCount} and last used db nonce: ${lastUsedNonceDb}`,
82+
service: "worker",
83+
});
84+
85+
// If the last used nonce onchain is the same as or ahead of the last used nonce in the db,
86+
// There is no need to resync the nonce.
87+
if (lastUsedNonceOnchain >= lastUsedNonceDb) {
88+
job.log(`No need to resync nonce for ${walletAddress}`);
89+
logger({
90+
level: "debug",
91+
message: `[nonceResyncWorker] No need to resync nonce for ${walletAddress}`,
92+
service: "worker",
93+
});
94+
return;
95+
}
96+
97+
// for each nonce between last used db nonce and last used onchain nonce
98+
// check if nonce exists in nonce-sent set
99+
// if it does not exist, recycle it
100+
for (
101+
let _nonce = lastUsedNonceOnchain + 1;
102+
_nonce < lastUsedNonceDb;
103+
_nonce++
104+
) {
105+
const exists = await isSentNonce(chainId, walletAddress, _nonce);
106+
logger({
107+
level: "debug",
108+
message: `[nonceResyncWorker] nonce ${_nonce} exists in nonce-sent set: ${exists}`,
109+
service: "worker",
110+
});
111+
112+
// If nonce does not exist in nonce-sent set, recycle it
113+
if (!exists) {
114+
await recycleNonce(chainId, walletAddress, _nonce);
115+
}
116+
}
117+
}
118+
};

src/worker/tasks/sendTransactionWorker.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { getContractAddress } from "viem";
99
import { TransactionDB } from "../../db/transactions/db";
1010
import {
1111
acquireNonce,
12+
addSentNonce,
1213
rebaseNonce,
1314
recycleNonce,
1415
} from "../../db/wallets/walletNonce";
@@ -171,8 +172,9 @@ const _sendTransaction = async (
171172
transactionHash = sendTransactionResult.transactionHash;
172173
job.log(`Sent transaction: ${transactionHash}`);
173174
} catch (error: unknown) {
174-
// If NonceAlreadyUsedError, rebase the nonce and retry.
175-
if (isNonceAlreadyUsedError(error)) {
175+
// If NonceAlreadyUsedError, which can also manifest as a ReplacementGasFeeTooLowError,
176+
// recycle the nonce and retry the transaction.
177+
if (isNonceAlreadyUsedError(error) || isReplacementGasFeeTooLow(error)) {
176178
const resyncNonce = await rebaseNonce(chainId, from);
177179
job.log(`Resynced nonce to ${resyncNonce}.`);
178180
} else {
@@ -182,6 +184,8 @@ const _sendTransaction = async (
182184
throw error;
183185
}
184186

187+
await addSentNonce(chainId, from, nonce);
188+
185189
return {
186190
...queuedTransaction,
187191
status: "sent",

0 commit comments

Comments
 (0)