Skip to content

Commit bb8b756

Browse files
authored
chore: do nonce health check in batches (#680)
1 parent ddabd6b commit bb8b756

File tree

2 files changed

+27
-42
lines changed

2 files changed

+27
-42
lines changed

src/worker/tasks/migratePostgresTransactionsWorker.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ export const initMigratePostgresTransactionsWorker = async () => {
3535
logWorkerExceptions(_worker);
3636
};
3737

38-
export const sleep = async (ms: number) =>
39-
new Promise((resolve) => setTimeout(resolve, ms, null));
40-
4138
const handler: Processor<any, void, string> = async (job: Job<string>) => {
4239
// Migrate sent transactions from PostgresDB -> Redis queue.
4340
await prisma.$transaction(async (pgtx) => {

src/worker/tasks/nonceHealthCheckWorker.ts

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { Job, Processor, Worker } from "bullmq";
2-
import { Address, getAddress } from "thirdweb";
1+
import { Worker, type Job, type Processor } from "bullmq";
2+
import { getAddress, type Address } from "thirdweb";
33
import {
44
getUsedBackendWallets,
55
inspectNonce,
@@ -19,6 +19,9 @@ const CHECK_PERIODS = 3;
1919
// Frequency of the worker
2020
const RUN_FREQUENCY_SECONDS = 60; // Run every minute
2121

22+
// The number of wallets to check in parallel.
23+
const BATCH_SIZE = 500;
24+
2225
// Interfaces
2326
interface NonceState {
2427
onchainNonce: number;
@@ -51,26 +54,27 @@ export const initNonceHealthCheckWorker = () => {
5154
const handler: Processor<any, void, string> = async (job: Job<string>) => {
5255
const allWallets = await getUsedBackendWallets();
5356

54-
const walletHealthPromises = allWallets.map(
55-
async ({ chainId, walletAddress }) => {
56-
const [_, isStuck, currentState] = await Promise.all([
57-
updateNonceHistory(walletAddress, chainId),
58-
isQueueStuck(walletAddress, chainId),
59-
getCurrentNonceState(walletAddress, chainId),
60-
]);
61-
62-
return {
63-
walletAddress,
64-
chainId,
65-
isStuck,
66-
onchainNonce: currentState.onchainNonce,
67-
largestSentNonce: currentState.largestSentNonce,
68-
};
69-
},
70-
);
57+
for (let i = 0; i < allWallets.length; i += BATCH_SIZE) {
58+
const batch = allWallets.slice(i, i + BATCH_SIZE);
59+
60+
await Promise.all(
61+
batch.map(async ({ chainId, walletAddress }) => {
62+
const [_, isStuck, currentState] = await Promise.all([
63+
updateNonceHistory(walletAddress, chainId),
64+
isQueueStuck(walletAddress, chainId),
65+
getCurrentNonceState(walletAddress, chainId),
66+
]);
67+
68+
logger({
69+
service: "worker",
70+
level: isStuck ? "fatal" : "info",
71+
message: `[WALLET_HEALTH] ${walletAddress}:${chainId} isStuck:${isStuck} onchainNonce:${currentState.onchainNonce} largestSentNonce:${currentState.largestSentNonce}`,
72+
});
73+
}),
74+
);
7175

72-
const walletHealthResults = await Promise.all(walletHealthPromises);
73-
logWalletHealth(walletHealthResults, job);
76+
await sleep(500);
77+
}
7478
};
7579

7680
// Check if a queue is stuck
@@ -145,21 +149,5 @@ async function updateNonceHistory(walletAddress: Address, chainId: number) {
145149
.exec();
146150
}
147151

148-
// Log wallet health
149-
function logWalletHealth(healthResults: WalletHealth[], job: Job<string>) {
150-
healthResults.forEach((result) => {
151-
const message =
152-
`[WALLET_HEALTH] ${result.walletAddress}:${result.chainId} ` +
153-
`isStuck:${result.isStuck} ` +
154-
`onchainNonce:${result.onchainNonce} ` +
155-
`largestSentNonce:${result.largestSentNonce}`;
156-
157-
logger({
158-
service: "worker",
159-
level: result.isStuck ? "fatal" : "info",
160-
message,
161-
});
162-
});
163-
164-
job.log(JSON.stringify(healthResults));
165-
}
152+
const sleep = async (ms: number) =>
153+
new Promise((resolve) => setTimeout(resolve, ms, null));

0 commit comments

Comments
 (0)