Skip to content

Commit 102cd3b

Browse files
committed
Merge remote-tracking branch 'origin/ph/txWorkerRedis' into ph/txWorkerRedis
2 parents 74874dc + 6587eca commit 102cd3b

35 files changed

+1351
-22
lines changed

Dockerfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ COPY . .
4242
# Prune dev dependencies from the packages
4343
RUN yarn install --frozen-lockfile --production=false --network-timeout 1000000 && \
4444
yarn build && \
45+
yarn copy-files && \
4546
yarn install --frozen-lockfile --production=true --network-timeout 1000000
4647

4748
##############################
@@ -64,4 +65,7 @@ COPY --from=build /app/node_modules ./node_modules
6465
COPY --from=build /app/src/prisma/* ./src/prisma/
6566
COPY --from=build /app/dist ./dist
6667

68+
# Replace the schema path in the package.json file
69+
RUN sed -i 's_"schema": "./src/prisma/schema.prisma"_"schema": "./dist/prisma/schema.prisma"_g' package.json
70+
6771
ENTRYPOINT [ "yarn", "start"]

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
"start:docker-force-build": "docker compose --profile engine --env-file ./.env up --remove-orphans --build",
2323
"lint:fix": "eslint --fix 'src/**/*.ts'",
2424
"test:unit": "vitest",
25-
"test:coverage": "vitest run --coverage"
25+
"test:coverage": "vitest run --coverage",
26+
"copy-files": "copyfiles -u 2 ./src/prisma/* ./dist/prisma/ && copyfiles -u 3 ./src/prisma/migrations/**/*.sql ./dist/prisma/migrations/"
2627
},
2728
"dependencies": {
2829
"@aws-sdk/client-kms": "^3.398.0",
@@ -47,6 +48,7 @@
4748
"bullmq": "^5.11.0",
4849
"cookie": "^0.5.0",
4950
"cookie-parser": "^1.4.6",
51+
"copyfiles": "^2.4.1",
5052
"cron-parser": "^4.9.0",
5153
"crypto": "^1.0.1",
5254
"crypto-js": "^4.2.0",

src/db/wallets/walletNonce.ts

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Address, eth_getTransactionCount, getRpcClient } from "thirdweb";
22
import { getChain } from "../../utils/chain";
3+
import { logger } from "../../utils/logger";
34
import { normalizeAddress } from "../../utils/primitiveTypes";
45
import { redis } from "../../utils/redis/redis";
56
import { thirdwebClient } from "../../utils/sdk";
@@ -8,16 +9,72 @@ import { thirdwebClient } from "../../utils/sdk";
89
* The "last used nonce" stores the last nonce submitted onchain.
910
* Example: "25"
1011
*/
11-
const lastUsedNonceKey = (chainId: number, walletAddress: Address) =>
12+
export const lastUsedNonceKey = (chainId: number, walletAddress: Address) =>
1213
`nonce:${chainId}:${normalizeAddress(walletAddress)}`;
1314

1415
/**
15-
* The "recycled nonces" list stores unsorted nonces to be reused or cancelled.
16+
* The "recycled nonces" set stores unsorted nonces to be reused or cancelled.
1617
* Example: [ "25", "23", "24" ]
1718
*/
18-
const recycledNoncesKey = (chainId: number, walletAddress: Address) =>
19+
export const recycledNoncesKey = (chainId: number, walletAddress: Address) =>
1920
`nonce-recycled:${chainId}:${normalizeAddress(walletAddress)}`;
2021

22+
/**
23+
* The "sent nonces" set stores nonces that have been sent on chain but not yet mined.
24+
*
25+
* Example: [ "25", "23", "24" ]
26+
*
27+
* The `nonceResyncWorker` periodically fetches the onchain transaction count for each wallet (a),
28+
* compares it to the last nonce sent (b), and for every nonce between b and a,
29+
* it recycles the nonce if the nonce is not in this set.
30+
*/
31+
export const sentNoncesKey = (chainId: number, walletAddress: Address) =>
32+
`nonce-sent:${chainId}:${normalizeAddress(walletAddress)}`;
33+
34+
export const splitSentNoncesKey = (key: string) => {
35+
const _splittedKeys = key.split(":");
36+
const walletAddress = normalizeAddress(_splittedKeys[2]);
37+
const chainId = parseInt(_splittedKeys[1]);
38+
return { walletAddress, chainId };
39+
};
40+
41+
/**
42+
* Adds a nonce to the sent nonces set (`nonce-sent:${chainId}:${walletAddress}`).
43+
*/
44+
export const addSentNonce = async (
45+
chainId: number,
46+
walletAddress: Address,
47+
nonce: number,
48+
) => {
49+
const key = sentNoncesKey(chainId, walletAddress);
50+
await redis.sadd(key, nonce.toString());
51+
};
52+
53+
/**
54+
* Removes a nonce from the sent nonces set (`nonce-sent:${chainId}:${walletAddress}`).
55+
*/
56+
export const removeSentNonce = async (
57+
chainId: number,
58+
walletAddress: Address,
59+
nonce: number,
60+
) => {
61+
const key = sentNoncesKey(chainId, walletAddress);
62+
const removed = await redis.srem(key, nonce.toString());
63+
return removed === 1;
64+
};
65+
66+
/**
67+
* Check if a nonce is in the sent nonces set.
68+
*/
69+
export const isSentNonce = async (
70+
chainId: number,
71+
walletAddress: Address,
72+
nonce: number,
73+
) => {
74+
const key = sentNoncesKey(chainId, walletAddress);
75+
return !!(await redis.sismember(key, nonce.toString()));
76+
};
77+
2178
/**
2279
* Acquire an unused nonce.
2380
* This should be used to send an EOA transaction with this nonce.
@@ -58,8 +115,16 @@ export const recycleNonce = async (
58115
walletAddress: Address,
59116
nonce: number,
60117
) => {
118+
if (isNaN(nonce)) {
119+
logger({
120+
level: "warn",
121+
message: `[recycleNonce] Invalid nonce: ${nonce}`,
122+
service: "worker",
123+
});
124+
return;
125+
}
61126
const key = recycledNoncesKey(chainId, walletAddress);
62-
await redis.rpush(key, nonce);
127+
await redis.sadd(key, nonce.toString());
63128
};
64129

65130
/**
@@ -73,7 +138,7 @@ const _acquireRecycledNonce = async (
73138
walletAddress: Address,
74139
) => {
75140
const key = recycledNoncesKey(chainId, walletAddress);
76-
const res = await redis.lpop(key);
141+
const res = await redis.spop(key);
77142
return res ? parseInt(res) : null;
78143
};
79144

@@ -122,3 +187,36 @@ export const deleteAllNonces = async () => {
122187
await redis.del(keys);
123188
}
124189
};
190+
191+
/**
192+
* Resync the nonce i.e., max of transactionCount +1 and lastUsedNonce.
193+
* @param chainId
194+
* @param walletAddress
195+
*/
196+
export const rebaseNonce = async (chainId: number, walletAddress: Address) => {
197+
const rpcRequest = getRpcClient({
198+
client: thirdwebClient,
199+
chain: await getChain(chainId),
200+
});
201+
202+
// The next unused nonce = transactionCount.
203+
const transactionCount = await eth_getTransactionCount(rpcRequest, {
204+
address: walletAddress,
205+
});
206+
207+
// Lua script to set nonce as max
208+
const script = `
209+
local transactionCount = tonumber(ARGV[1])
210+
local lastUsedNonce = tonumber(redis.call('get', KEYS[1]))
211+
local nextNonce = math.max(transactionCount-1, lastUsedNonce)
212+
redis.call('set', KEYS[1], nextNonce)
213+
return nextNonce
214+
`;
215+
const nextNonce = await redis.eval(
216+
script,
217+
1,
218+
lastUsedNonceKey(chainId, normalizeAddress(walletAddress)),
219+
transactionCount.toString(),
220+
);
221+
return nextNonce;
222+
};
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- CreateIndex
2+
CREATE INDEX CONCURRENTLY "transactions_queuedAt_idx" ON "transactions"("queuedAt");

src/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ model Transactions {
163163
@@index([sentAt, minedAt, cancelledAt, errorMessage, queuedAt])
164164
@@index([sentAt, accountAddress, userOpHash, minedAt, errorMessage, retryCount])
165165
@@index([sentAt, transactionHash, accountAddress, minedAt, errorMessage, nonce])
166+
@@index([queuedAt])
166167
@@map("transactions")
167168
}
168169

src/scripts/setup-db.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ const main = async () => {
1212
);
1313
`;
1414

15-
const schema = `./src/prisma/schema.prisma`;
15+
const schema =
16+
process.env.NODE_ENV === "production"
17+
? `./dist/prisma/schema.prisma`
18+
: `./src/prisma/schema.prisma`;
1619

1720
if (hasWalletsTable) {
1821
execSync(`yarn prisma migrate reset --force --schema ${schema}`, {

src/server/middleware/adminRoutes.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { env } from "../../utils/env";
1010
import { CancelRecycledNoncesQueue } from "../../worker/queues/cancelRecycledNoncesQueue";
1111
import { MigratePostgresTransactionsQueue } from "../../worker/queues/migratePostgresTransactionsQueue";
1212
import { MineTransactionQueue } from "../../worker/queues/mineTransactionQueue";
13+
import { NonceResyncQueue } from "../../worker/queues/nonceResyncQueue";
1314
import { ProcessEventsLogQueue } from "../../worker/queues/processEventLogsQueue";
1415
import { ProcessTransactionReceiptsQueue } from "../../worker/queues/processTransactionReceiptsQueue";
1516
import { PruneTransactionsQueue } from "../../worker/queues/pruneTransactionsQueue";
@@ -28,6 +29,7 @@ const QUEUES: Queue[] = [
2829
CancelRecycledNoncesQueue.q,
2930
PruneTransactionsQueue.q,
3031
MigratePostgresTransactionsQueue.q,
32+
NonceResyncQueue.q,
3133
];
3234

3335
export const withAdminRoutes = async (fastify: FastifyInstance) => {

src/server/schemas/wallet/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ export const walletHeaderSchema = Type.Object({
55
"x-backend-wallet-address": Type.String({
66
examples: ["0x..."],
77
description: "Backend wallet address",
8+
maxLength: 42,
9+
minLength: 42,
10+
pattern: "^0x[0-9a-zA-Z]+",
811
}),
912
"x-idempotency-key": Type.Optional(
1013
Type.String({
@@ -18,6 +21,9 @@ export const walletWithAAHeaderSchema = Type.Object({
1821
"x-account-address": Type.Optional(
1922
Type.String({
2023
description: "Smart account address",
24+
maxLength: 42,
25+
minLength: 42,
26+
pattern: "^0x[0-9a-zA-Z]+",
2127
}),
2228
),
2329
});
@@ -34,5 +40,8 @@ export const walletWithAddressParamSchema = Type.Object({
3440
walletAddress: Type.String({
3541
examples: ["0x..."],
3642
description: "Backend wallet address",
43+
maxLength: 42,
44+
minLength: 42,
45+
pattern: "^0x[0-9a-zA-Z]+",
3746
}),
3847
});

src/utils/account.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { EVMWallet } from "@thirdweb-dev/wallets";
2-
import { Signer } from "ethers";
2+
import { Signer, providers } from "ethers";
3+
34
import { Address } from "thirdweb";
45
import { ethers5Adapter } from "thirdweb/adapters/ethers5";
56
import { Account } from "thirdweb/wallets";
@@ -9,6 +10,7 @@ import { getAwsKmsWallet } from "../server/utils/wallets/getAwsKmsWallet";
910
import { getGcpKmsWallet } from "../server/utils/wallets/getGcpKmsWallet";
1011
import { getLocalWallet } from "../server/utils/wallets/getLocalWallet";
1112
import { getSmartWallet } from "../server/utils/wallets/getSmartWallet";
13+
import { getChain } from "./chain";
1214

1315
export const _accountsCache = new Map<string, Account>();
1416

@@ -55,6 +57,7 @@ export const getAccount = async (args: {
5557

5658
// Get smart wallet if `accountAddress` is provided.
5759
let signer: Signer;
60+
5861
if (accountAddress) {
5962
const smartWallet = await getSmartWallet({
6063
chainId,
@@ -66,7 +69,17 @@ export const getAccount = async (args: {
6669
signer = await wallet.getSigner();
6770
}
6871

69-
const account = await ethers5Adapter.signer.fromEthers({ signer });
72+
if (walletDetails.type !== WalletType.local) {
73+
// Get chain rpc provider.
74+
const chain = await getChain(chainId);
75+
const provider = new providers.JsonRpcProvider(chain.rpc);
76+
77+
signer = signer.connect(provider);
78+
}
79+
80+
const account = await ethers5Adapter.signer.fromEthers({
81+
signer,
82+
});
7083

7184
// Set cache.
7285
_accountsCache.set(cacheKey, account);

src/worker/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { initCancelRecycledNoncesWorker } from "./tasks/cancelRecycledNoncesWorker";
1111
import { initMigratePostgresTransactionsWorker } from "./tasks/migratePostgresTransactionsWorker";
1212
import { initMineTransactionWorker } from "./tasks/mineTransactionWorker";
13+
import { initNonceResyncWorker } from "./tasks/nonceResyncWorker";
1314
import { initProcessEventLogsWorker } from "./tasks/processEventLogsWorker";
1415
import { initProcessTransactionReceiptsWorker } from "./tasks/processTransactionReceiptsWorker";
1516
import { initPruneTransactionsWorker } from "./tasks/pruneTransactionsWorker";
@@ -26,6 +27,8 @@ export const initWorker = async () => {
2627
initSendWebhookWorker();
2728
await initMigratePostgresTransactionsWorker();
2829

30+
await initNonceResyncWorker();
31+
2932
// Listen for new & updated configuration data.
3033
await newConfigurationListener();
3134
await updatedConfigurationListener();

src/worker/queues/nonceResyncQueue.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { Queue } from "bullmq";
2+
import { redis } from "../../utils/redis/redis";
3+
import { defaultJobOptions } from "./queues";
4+
5+
export class NonceResyncQueue {
6+
static q = new Queue<string>("nonce-resync-cron", {
7+
connection: redis,
8+
defaultJobOptions,
9+
});
10+
11+
constructor() {
12+
NonceResyncQueue.q.setGlobalConcurrency(1);
13+
14+
// The cron job is defined in `initNonceResyncWorker`
15+
// because it requires an async call to query configuration.
16+
}
17+
}

src/worker/tasks/cancelRecycledNoncesWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ const getAndDeleteUnusedNonces = async (key: string) => {
7777
// Returns all unused nonces for this key and deletes the key.
7878
const script = `
7979
local key = ARGV[1]
80-
local members = redis.call('LRANGE', key, 0, -1)
80+
local members = redis.call('SMEMBERS', key)
8181
redis.call('DEL', key)
8282
return members
8383
`;

0 commit comments

Comments
 (0)