Skip to content

Commit 66b152c

Browse files
authored
Optimize transaction throughput using batch requests (#278)
* Send transactions in one batch * Update yarn test:load
1 parent 24fdb47 commit 66b152c

File tree

3 files changed

+139
-85
lines changed

3 files changed

+139
-85
lines changed

src/worker/tasks/processTx.ts

Lines changed: 117 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import { Static } from "@sinclair/typebox";
2-
import { getDefaultGasOverrides } from "@thirdweb-dev/sdk";
2+
import {
3+
StaticJsonRpcBatchProvider,
4+
getDefaultGasOverrides,
5+
} from "@thirdweb-dev/sdk";
36
import { ERC4337EthersSigner } from "@thirdweb-dev/wallets/dist/declarations/src/evm/connectors/smart-wallet/lib/erc4337-signer";
47
import { ethers } from "ethers";
58
import { BigNumber } from "ethers/lib/ethers";
9+
import { RpcResponse } from "viem/_types/utils/rpc";
610
import { prisma } from "../../db/client";
711
import { getConfiguration } from "../../db/configuration/getConfiguration";
812
import { getQueuedTxs } from "../../db/transactions/getQueuedTxs";
@@ -100,6 +104,15 @@ export const processTx = async () => {
100104
walletAddress,
101105
});
102106

107+
const [signer, provider] = await Promise.all([
108+
sdk.getSigner(),
109+
sdk.getProvider() as StaticJsonRpcBatchProvider,
110+
]);
111+
112+
if (!signer || !provider) {
113+
return;
114+
}
115+
103116
// - For each wallet address, check the nonce in database and the mempool
104117
const [walletBalance, mempoolNonceData, dbNonceData, gasOverrides] =
105118
await Promise.all([
@@ -110,7 +123,7 @@ export const processTx = async () => {
110123
chainId,
111124
address: walletAddress,
112125
}),
113-
getDefaultGasOverrides(sdk.getProvider()),
126+
getDefaultGasOverrides(provider),
114127
]);
115128

116129
// Wallet Balance Webhook
@@ -159,93 +172,115 @@ export const processTx = async () => {
159172
startNonce = dbNonce;
160173
}
161174

162-
let incrementNonce = 0;
163-
164-
// - Wait for transactions to be sent successfully
165-
const txStatuses: SentTxStatus[] = [];
175+
// Group all transactions into a single batch rpc request
176+
const rpcRequests = [];
166177
for (const i in txsToSend) {
167178
const tx = txsToSend[i];
168179
const nonce = startNonce.add(i);
169180

170-
try {
171-
logger.worker.info(
172-
`[Transaction] [${tx.queueId}] Sending with nonce '${nonce}'`,
173-
);
174-
const res = await sdk.getSigner()!.sendTransaction({
175-
to: tx.toAddress!,
176-
from: tx.fromAddress!,
177-
data: tx.data!,
178-
value: tx.value!,
179-
nonce,
180-
...gasOverrides,
181-
});
182-
183-
logger.worker.info(
184-
`[Transaction] [${tx.queueId}] Submitted with nonce '${nonce}' & hash '${res.hash}'`,
185-
);
186-
187-
// - Keep track of the number of transactions that went through successfully
188-
incrementNonce++;
189-
txStatuses.push({
190-
status: TransactionStatusEnum.Submitted,
191-
queueId: tx.queueId!,
192-
res,
193-
sentAtBlockNumber: await sdk.getProvider().getBlockNumber(),
194-
});
195-
} catch (err: any) {
196-
logger.worker.warn(
197-
`[Transaction] [${tx.queueId}] [Nonce: ${nonce}] Failed to send with error - ${err}`,
198-
);
199-
200-
txStatuses.push({
201-
status: TransactionStatusEnum.Errored,
202-
queueId: tx.queueId!,
203-
errorMessage:
204-
err?.message ||
205-
err?.toString() ||
206-
`Failed to handle transaction`,
207-
});
208-
}
209-
210-
// - After sending transactions, update database for each transaction
211-
await Promise.all(
212-
txStatuses.map(async (tx) => {
213-
switch (tx.status) {
214-
case TransactionStatusEnum.Submitted:
215-
await updateTx({
216-
pgtx,
217-
queueId: tx.queueId,
218-
data: {
219-
status: TransactionStatusEnum.Submitted,
220-
res: tx.res,
221-
sentAtBlockNumber: await sdk
222-
.getProvider()
223-
.getBlockNumber(),
224-
},
225-
});
226-
break;
227-
case TransactionStatusEnum.Errored:
228-
await updateTx({
229-
pgtx,
230-
queueId: tx.queueId,
231-
data: {
232-
status: TransactionStatusEnum.Errored,
233-
errorMessage: tx.errorMessage,
234-
},
235-
});
236-
break;
237-
}
238-
}),
239-
);
181+
const txRequest = await signer.populateTransaction({
182+
to: tx.toAddress!,
183+
from: tx.fromAddress!,
184+
data: tx.data!,
185+
value: tx.value!,
186+
nonce,
187+
...gasOverrides,
188+
});
189+
const signature = await signer.signTransaction(txRequest);
240190

241-
// - And finally update the nonce with the number of successful transactions
242-
await updateWalletNonce({
243-
pgtx,
244-
address: walletAddress,
245-
chainId,
246-
nonce: startNonce.toNumber() + incrementNonce,
191+
rpcRequests.push({
192+
id: i,
193+
jsonrpc: "2.0",
194+
method: "eth_sendRawTransaction",
195+
params: [signature],
247196
});
248197
}
198+
199+
// Send all the transactions as one batch request
200+
const res = await fetch(provider.connection.url, {
201+
method: "POST",
202+
headers: {
203+
"Content-Type": "application/json",
204+
},
205+
body: JSON.stringify(rpcRequests),
206+
});
207+
const rpcResponses: RpcResponse[] = await res.json();
208+
209+
// Check how many transactions succeeded and increment nonce
210+
const incrementNonce = rpcResponses.reduce((acc, curr) => {
211+
return curr.result && !curr.error ? acc + 1 : acc;
212+
}, 0);
213+
214+
await updateWalletNonce({
215+
pgtx,
216+
address: walletAddress,
217+
chainId,
218+
nonce: startNonce.toNumber() + incrementNonce,
219+
});
220+
221+
// Update transaction records with updated data
222+
const txStatuses: SentTxStatus[] = await Promise.all(
223+
rpcResponses.map(async (rpcRes, i) => {
224+
const tx = txsToSend[i];
225+
if (rpcRes.result) {
226+
const txHash = rpcRes.result;
227+
const txRes = await provider.getTransaction(txHash);
228+
229+
return {
230+
status: TransactionStatusEnum.Submitted,
231+
queueId: tx.queueId!,
232+
res: txRes,
233+
sentAtBlockNumber: await provider.getBlockNumber(),
234+
};
235+
} else {
236+
logger.worker.warn(
237+
`[Transaction] [${
238+
tx.queueId
239+
}] Failed to send with error - ${JSON.stringify(
240+
rpcRes.error,
241+
)}`,
242+
);
243+
244+
return {
245+
status: TransactionStatusEnum.Errored,
246+
queueId: tx.queueId!,
247+
errorMessage:
248+
rpcRes.error?.message ||
249+
rpcRes.error?.toString() ||
250+
`Failed to handle transaction`,
251+
};
252+
}
253+
}),
254+
);
255+
256+
// - After sending transactions, update database for each transaction
257+
await Promise.all(
258+
txStatuses.map(async (tx) => {
259+
switch (tx.status) {
260+
case TransactionStatusEnum.Submitted:
261+
await updateTx({
262+
pgtx,
263+
queueId: tx.queueId,
264+
data: {
265+
status: TransactionStatusEnum.Submitted,
266+
res: tx.res,
267+
sentAtBlockNumber: await provider.getBlockNumber(),
268+
},
269+
});
270+
break;
271+
case TransactionStatusEnum.Errored:
272+
await updateTx({
273+
pgtx,
274+
queueId: tx.queueId,
275+
data: {
276+
status: TransactionStatusEnum.Errored,
277+
errorMessage: tx.errorMessage,
278+
},
279+
});
280+
break;
281+
}
282+
}),
283+
);
249284
});
250285

251286
// 5. Send all user operations in parallel with multi-dimensional nonce

test/load/index.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type OptionsInput = z.input<typeof OptionsSchema>;
2929
type Result = {
3030
requestTime: number;
3131
status?: "mined" | "errored" | "pending";
32+
sendTime?: number;
3233
mineTime?: number;
3334
};
3435

@@ -162,7 +163,7 @@ program
162163

163164
const {
164165
res: {
165-
result: { status },
166+
result: { status, sentAt, minedAt },
166167
},
167168
} = await fetchEngine({
168169
host: options.host as string,
@@ -173,8 +174,10 @@ program
173174

174175
if (status === "mined") {
175176
result.status = "mined";
176-
result.mineTime = timer.ellapsed();
177+
result.sendTime = timer.ellapsed(new Date(sentAt));
178+
result.mineTime = timer.ellapsed(new Date(minedAt));
177179
break;
180+
} else if (res.result?.status === "sent") {
178181
} else if (res.result?.status === "errored") {
179182
result.status = "errored";
180183
break;
@@ -214,6 +217,16 @@ program
214217
return;
215218
}
216219

220+
console.log("\n\nsent times (seconds)");
221+
console.table({
222+
average:
223+
results.reduce((acc, curr) => acc + curr.sendTime!, 0) / results.length,
224+
minimum: results.sort((a, b) => a.sendTime! - b.sendTime!)[0].sendTime,
225+
maximum: results.sort((a, b) => a.sendTime! - b.sendTime!)[
226+
results.length - 1
227+
].sendTime,
228+
});
229+
217230
console.log("\n\nmine times (seconds)");
218231
console.table({
219232
average:

test/load/utils/time.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@ export const time = async <TReturnType extends { [key: string]: any }>(
1414
export const createTimer = () => {
1515
const start = Date.now();
1616
return {
17-
ellapsed: () => (Date.now() - start) / 1000,
17+
ellapsed: (end?: Date) => {
18+
if (end) {
19+
return (end.getTime() - start) / 1000;
20+
}
21+
22+
return (Date.now() - start) / 1000;
23+
},
1824
};
1925
};
2026

0 commit comments

Comments
 (0)