Skip to content

Commit e0de832

Browse files
authored
feat: nonce stats endpoint + queue/nonce log cron (#628)
* feat: nonce stats endpoint + queue/nonce log cron * chore: Add LogQueue to bullboard and add queue initialisation * fix: add missed log line * fix: Update querystring schema for getNonceDetailsRoute * fix: use last used onchain nonce * ramped up nonce heuristics * feat: use redis pipelines * parse and sort * chore: rename LogQueue to NonceHealthCheckQueue * Simplify nonce health heuristic * feat: Improve nonce health check algorithm * chore: Update getOnchainNonce to getLastUsedOnchainNonce for consistency * chore: Refactor nonce health check worker for improved consistency and performance * fix logger: override default winston log levels to match env * fixes: consistent address key, run every 20s * feat: Filter used backend wallets by chainId and walletAddress This commit modifies the `getUsedBackendWallets` function in the `walletNonce.ts` file to add optional parameters `chainId` and `walletAddress`. These parameters allow filtering the used backend wallets based on the provided chainId and walletAddress. The function now uses these parameters to construct the Redis key pattern for fetching the used backend wallets. * refactor: Update Queue name in NonceHealthCheckQueue class * chore: Refactor getCurrentNonceState function to use BigInt for onchainNonce * chore: Update nonce health check worker frequency to run every minute
1 parent 1248acd commit e0de832

File tree

9 files changed

+462
-18
lines changed

9 files changed

+462
-18
lines changed

src/db/wallets/walletNonce.ts

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,48 @@
1-
import { Address, eth_getTransactionCount, getRpcClient } from "thirdweb";
1+
import {
2+
Address,
3+
eth_getTransactionCount,
4+
getAddress,
5+
getRpcClient,
6+
} from "thirdweb";
27
import { getChain } from "../../utils/chain";
38
import { logger } from "../../utils/logger";
49
import { normalizeAddress } from "../../utils/primitiveTypes";
510
import { redis } from "../../utils/redis/redis";
611
import { thirdwebClient } from "../../utils/sdk";
712
import { updateNonceMap } from "./nonceMap";
813

14+
/**
15+
* Get all used backend wallets.
16+
* Filters by chainId and walletAddress if provided.
17+
* Reads all the keys in the format `nonce:${chainId}:${walletAddress}` in the Redis DB.
18+
*
19+
* @example
20+
* getUsedBackendWallets()
21+
* // [ { chainId: 80001, walletAddress: "0x1234...5678" } ]
22+
*/
23+
export const getUsedBackendWallets = async (
24+
chainId?: number,
25+
walletAddress?: Address,
26+
): Promise<
27+
{
28+
chainId: number;
29+
walletAddress: Address;
30+
}[]
31+
> => {
32+
const keys = await redis.keys(
33+
`nonce:${chainId ?? "*"}:${
34+
walletAddress ? normalizeAddress(walletAddress) : "*"
35+
}`,
36+
);
37+
return keys.map((key) => {
38+
const tokens = key.split(":");
39+
return {
40+
chainId: parseInt(tokens[1]),
41+
walletAddress: getAddress(tokens[2]),
42+
};
43+
});
44+
};
45+
946
/**
1047
* The "last used nonce" stores the last nonce submitted onchain.
1148
* Example: "25"
@@ -14,8 +51,23 @@ export const lastUsedNonceKey = (chainId: number, walletAddress: Address) =>
1451
`nonce:${chainId}:${normalizeAddress(walletAddress)}`;
1552

1653
/**
17-
* The "recycled nonces" sorted set stores nonces to be reused or cancelled, sorted by nonce value.
18-
* Example: [ "23", "24", "25" ]
54+
* Split the last used nonce key into chainId and walletAddress.
55+
* @param key
56+
* @returns { chainId: number, walletAddress: Address }
57+
* @example
58+
* splitLastUsedNonceKey("nonce:80001:0x1234...5678")
59+
* // { chainId: 80001, walletAddress: "0x1234...5678" }
60+
*/
61+
export const splitLastUsedNonceKey = (key: string) => {
62+
const _splittedKeys = key.split(":");
63+
const walletAddress = normalizeAddress(_splittedKeys[2]);
64+
const chainId = parseInt(_splittedKeys[1]);
65+
return { walletAddress, chainId };
66+
};
67+
68+
/**
69+
* The "recycled nonces" set stores unsorted nonces to be reused or cancelled.
70+
* Example: [ "25", "23", "24" ]
1971
*/
2072
export const recycledNoncesKey = (chainId: number, walletAddress: Address) =>
2173
`nonce-recycled:${chainId}:${normalizeAddress(walletAddress)}`;

src/server/middleware/adminRoutes.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { env } from "../../utils/env";
1010
import { CancelRecycledNoncesQueue } from "../../worker/queues/cancelRecycledNoncesQueue";
1111
import { MigratePostgresTransactionsQueue } from "../../worker/queues/migratePostgresTransactionsQueue";
1212
import { MineTransactionQueue } from "../../worker/queues/mineTransactionQueue";
13+
import { NonceHealthCheckQueue } from "../../worker/queues/nonceHealthCheckQueue";
1314
import { NonceResyncQueue } from "../../worker/queues/nonceResyncQueue";
1415
import { ProcessEventsLogQueue } from "../../worker/queues/processEventLogsQueue";
1516
import { ProcessTransactionReceiptsQueue } from "../../worker/queues/processTransactionReceiptsQueue";
@@ -30,6 +31,7 @@ const QUEUES: Queue[] = [
3031
PruneTransactionsQueue.q,
3132
MigratePostgresTransactionsQueue.q,
3233
NonceResyncQueue.q,
34+
NonceHealthCheckQueue.q,
3335
];
3436

3537
export const withAdminRoutes = async (fastify: FastifyInstance) => {

src/server/routes/admin/nonces.ts

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import { Static, Type } from "@sinclair/typebox";
2+
import { FastifyInstance } from "fastify";
3+
import { StatusCodes } from "http-status-codes";
4+
import {
5+
Address,
6+
eth_getTransactionCount,
7+
getAddress,
8+
getRpcClient,
9+
} from "thirdweb";
10+
import {
11+
getUsedBackendWallets,
12+
lastUsedNonceKey,
13+
recycledNoncesKey,
14+
sentNoncesKey,
15+
} from "../../../db/wallets/walletNonce";
16+
import { getChain } from "../../../utils/chain";
17+
import { redis } from "../../../utils/redis/redis";
18+
import { thirdwebClient } from "../../../utils/sdk";
19+
import { standardResponseSchema } from "../../schemas/sharedApiSchemas";
20+
import { walletWithAddressParamSchema } from "../../schemas/wallet";
21+
22+
export const responseBodySchema = Type.Object({
23+
result: Type.Array(
24+
Type.Object({
25+
walletAddress: Type.String({
26+
description: "Backend Wallet Address",
27+
examples: ["0xcedf3b4d8f7f1f7e0f7f0f7f0f7f0f7f0f7f0f7f"],
28+
}),
29+
chainId: Type.Number({
30+
description: "Chain ID",
31+
examples: [80002],
32+
}),
33+
onchainNonce: Type.Integer({
34+
description: "Last mined nonce",
35+
examples: [0],
36+
}),
37+
lastUsedNonce: Type.Integer({
38+
description: "Last incremented nonce sent to the RPC",
39+
examples: [0],
40+
}),
41+
sentNonces: Type.Array(Type.Integer(), {
42+
description:
43+
"Nonces that were successfully sent to the RPC but not mined yet (in descending order)",
44+
examples: [[2, 1, 0]],
45+
}),
46+
recycledNonces: Type.Array(Type.Integer(), {
47+
examples: [[3, 2, 1]],
48+
description:
49+
"Nonces that were acquired but failed to be sent to the blockchain, waiting to be recycled or cancelled (in descending order)",
50+
}),
51+
}),
52+
),
53+
});
54+
55+
responseBodySchema.example = {
56+
result: [
57+
{
58+
walletAddress: "0xcedf3b4d8f7f1f7e0f7f0f7f0f7f0f7f0f7f0f7f",
59+
onchainNonce: 2,
60+
lastUsedNonce: 8,
61+
recycledNonces: [6, 7],
62+
chainId: 80002,
63+
},
64+
],
65+
};
66+
67+
const walletWithAddressQuerySchema = Type.Partial(walletWithAddressParamSchema);
68+
69+
export async function getNonceDetailsRoute(fastify: FastifyInstance) {
70+
fastify.route<{
71+
Querystring: Static<typeof walletWithAddressQuerySchema>;
72+
Reply: Static<typeof responseBodySchema>;
73+
}>({
74+
method: "GET",
75+
url: "/admin/nonces",
76+
schema: {
77+
summary: "Get nonce status details for wallets",
78+
description:
79+
"Admin route to get nonce status details for all wallets filtered by address and chain ",
80+
tags: ["Admin"],
81+
operationId: "nonceDetails",
82+
querystring: walletWithAddressQuerySchema,
83+
response: {
84+
...standardResponseSchema,
85+
[StatusCodes.OK]: responseBodySchema,
86+
},
87+
hide: true,
88+
},
89+
handler: async (request, reply) => {
90+
const { walletAddress, chain } = request.query;
91+
const result = await getNonceDetails({
92+
walletAddress: walletAddress ? getAddress(walletAddress) : undefined,
93+
chainId: chain ? parseInt(chain) : undefined,
94+
});
95+
96+
reply.status(StatusCodes.OK).send({
97+
result,
98+
});
99+
},
100+
});
101+
}
102+
103+
export const getNonceDetails = async ({
104+
walletAddress,
105+
chainId,
106+
}: {
107+
walletAddress?: Address;
108+
chainId?: number;
109+
} = {}) => {
110+
const usedBackendWallets = await getUsedBackendWallets(
111+
chainId,
112+
walletAddress,
113+
);
114+
115+
const pipeline = redis.pipeline();
116+
const onchainNoncePromises: Promise<number>[] = [];
117+
118+
const keyMap = usedBackendWallets.map(({ chainId, walletAddress }) => {
119+
pipeline.get(lastUsedNonceKey(chainId, walletAddress));
120+
pipeline.smembers(sentNoncesKey(chainId, walletAddress));
121+
pipeline.smembers(recycledNoncesKey(chainId, walletAddress));
122+
123+
onchainNoncePromises.push(getLastUsedOnchainNonce(chainId, walletAddress));
124+
125+
return { chainId, walletAddress };
126+
});
127+
128+
const [pipelineResults, onchainNonces] = await Promise.all([
129+
pipeline.exec(),
130+
Promise.all(onchainNoncePromises),
131+
]);
132+
133+
if (!pipelineResults) {
134+
throw new Error("Failed to execute Redis pipeline");
135+
}
136+
137+
return keyMap.map((key, index) => {
138+
const pipelineOffset = index * 3;
139+
const [lastUsedNonceResult, sentNoncesResult, recycledNoncesResult] =
140+
pipelineResults.slice(pipelineOffset, pipelineOffset + 3);
141+
142+
return {
143+
walletAddress: key.walletAddress,
144+
chainId: key.chainId,
145+
onchainNonce: onchainNonces[index],
146+
lastUsedNonce: parseInt(lastUsedNonceResult[1] as string) ?? 0,
147+
sentNonces: (sentNoncesResult[1] as string[])
148+
.map((nonce) => parseInt(nonce))
149+
.sort((a, b) => b - a),
150+
recycledNonces: (recycledNoncesResult[1] as string[])
151+
.map((nonce) => parseInt(nonce))
152+
.sort((a, b) => b - a),
153+
};
154+
});
155+
};
156+
157+
/*
158+
* Get the last used nonce onchain
159+
* @param chainId Chain ID
160+
* @param walletAddress Wallet address
161+
* @returns Next unused nonce
162+
*/
163+
export const getLastUsedOnchainNonce = async (
164+
chainId: number,
165+
walletAddress: string,
166+
) => {
167+
const rpcRequest = getRpcClient({
168+
client: thirdwebClient,
169+
chain: await getChain(chainId),
170+
});
171+
172+
// The next unused nonce = transactionCount.
173+
const transactionCount = await eth_getTransactionCount(rpcRequest, {
174+
address: walletAddress,
175+
blockTag: "latest",
176+
});
177+
178+
const onchainNonce = transactionCount - 1;
179+
return onchainNonce;
180+
};

src/server/routes/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { FastifyInstance } from "fastify";
2+
import { getNonceDetailsRoute } from "./admin/nonces";
23
import { getTransactionDetails } from "./admin/transaction";
34
import { createAccessToken } from "./auth/access-tokens/create";
45
import { getAllAccessTokens } from "./auth/access-tokens/getAll";
@@ -261,4 +262,5 @@ export const withRoutes = async (fastify: FastifyInstance) => {
261262

262263
// Admin
263264
await fastify.register(getTransactionDetails);
265+
await fastify.register(getNonceDetailsRoute);
264266
};

src/utils/logger.ts

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,57 @@
11
import { createLogger, format, transports } from "winston";
22
import { env } from "./env";
33

4+
type LogLevels = typeof env.LOG_LEVEL;
5+
6+
// Define custom log levels that strictly match the log levels in the env file
7+
const customLevels: {
8+
levels: { [K in LogLevels]: number };
9+
colors: { [K in LogLevels]: string };
10+
} = {
11+
levels: {
12+
fatal: 0,
13+
error: 1,
14+
warn: 2,
15+
info: 3,
16+
debug: 4,
17+
trace: 5,
18+
},
19+
colors: {
20+
fatal: "red",
21+
error: "red",
22+
warn: "yellow",
23+
info: "green",
24+
debug: "blue",
25+
trace: "gray",
26+
},
27+
};
28+
429
// Custom filter for stdout transport
5-
const filterOnlyInfoAndWarn = format((info) => {
6-
if (info.level === "error") {
7-
return false; // Exclude 'error' level logs
30+
const filterNonErrors = format((info) => {
31+
if (info.level === "error" || info.level === "fatal") {
32+
return false; // only include non-error logs
833
}
934
return info;
1035
});
1136

1237
// Custom filter for stderr transport
13-
const filterOnlyErrors = format((info) => {
14-
if (info.level !== "error") {
15-
return false; // Exclude non-error level logs
38+
const filterErrorsAndFatal = format((info) => {
39+
if (info.level === "error" || info.level === "fatal") {
40+
return info; // only include error and fatal logs
1641
}
17-
return info;
42+
return false;
1843
});
1944

2045
const colorizeFormat = () => {
2146
if (env.NODE_ENV === "development") {
22-
return format.colorize();
47+
return format.colorize({ colors: customLevels.colors });
2348
} else {
2449
return format.uncolorize();
2550
}
2651
};
2752

2853
const winstonLogger = createLogger({
54+
levels: customLevels.levels,
2955
level: env.LOG_LEVEL,
3056
format: format.combine(
3157
format.timestamp(),
@@ -40,15 +66,14 @@ const winstonLogger = createLogger({
4066
}),
4167
),
4268
transports: [
43-
// Transport for stdout
69+
// Transport for stdout (non-error logs)
4470
new transports.Console({
45-
format: format.combine(filterOnlyInfoAndWarn()),
46-
stderrLevels: [], // Don't log "error" to stdout
71+
format: format.combine(filterNonErrors()),
4772
}),
48-
// Transport for stderr
73+
// Transport for stderr (error and fatal logs)
4974
new transports.Console({
50-
format: format.combine(filterOnlyErrors()),
51-
stderrLevels: ["error"], // Ensure errors go to stderr
75+
format: format.combine(filterErrorsAndFatal()),
76+
stderrLevels: ["error", "fatal"],
5277
}),
5378
],
5479
});
@@ -85,7 +110,7 @@ export const logger = ({
85110
}
86111

87112
if (error) {
88-
winstonLogger.error(`${prefix}${message}${suffix}`, { error });
113+
winstonLogger.error(level, `${prefix}${message}${suffix}`, { error });
89114
} else {
90115
winstonLogger.log(level, `${prefix}${message}${suffix}`);
91116
}

src/worker/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { initCancelRecycledNoncesWorker } from "./tasks/cancelRecycledNoncesWorker";
1111
import { initMigratePostgresTransactionsWorker } from "./tasks/migratePostgresTransactionsWorker";
1212
import { initMineTransactionWorker } from "./tasks/mineTransactionWorker";
13+
import { initNonceHealthCheckWorker } from "./tasks/nonceHealthCheckWorker";
1314
import { initNonceResyncWorker } from "./tasks/nonceResyncWorker";
1415
import { initProcessEventLogsWorker } from "./tasks/processEventLogsWorker";
1516
import { initProcessTransactionReceiptsWorker } from "./tasks/processTransactionReceiptsWorker";
@@ -25,6 +26,9 @@ export const initWorker = async () => {
2526
initSendTransactionWorker();
2627
initMineTransactionWorker();
2728
initSendWebhookWorker();
29+
30+
initNonceHealthCheckWorker();
31+
2832
await initMigratePostgresTransactionsWorker();
2933

3034
await initNonceResyncWorker();

0 commit comments

Comments
 (0)