@@ -45,35 +45,23 @@ type RpcResponseData = {
45
45
} ;
46
46
47
47
export const processTx = async ( ) => {
48
+ logger ( { service : "worker" , level : "info" , message : "[processTx] Start" } ) ;
49
+
48
50
try {
49
51
const sendWebhookForQueueIds : WebhookData [ ] = [ ] ;
50
52
const reportUsageForQueueIds : ReportUsageParams [ ] = [ ] ;
51
53
await prisma . $transaction (
52
54
async ( pgtx ) => {
53
55
// 1. Select a batch of transactions and lock the rows so no other workers pick them up
54
56
const txs = await getQueuedTxs ( { pgtx } ) ;
55
-
56
57
if ( txs . length === 0 ) {
57
58
return ;
58
59
}
59
60
60
- logger ( {
61
- service : "worker" ,
62
- level : "info" ,
63
- message : `Received ${ txs . length } transactions to process` ,
64
- } ) ;
65
-
66
61
// 2. Sort transactions and user operations.
67
62
const txsToSend : Transactions [ ] = [ ] ;
68
63
const userOpsToSend : Transactions [ ] = [ ] ;
69
64
for ( const tx of txs ) {
70
- logger ( {
71
- service : "worker" ,
72
- level : "info" ,
73
- queueId : tx . id ,
74
- message : `Processing` ,
75
- } ) ;
76
-
77
65
if ( tx . accountAddress && tx . signerAddress ) {
78
66
userOpsToSend . push ( tx ) ;
79
67
} else {
@@ -92,6 +80,14 @@ export const processTx = async () => {
92
80
}
93
81
} ) ;
94
82
83
+ logger ( {
84
+ service : "worker" ,
85
+ level : "info" ,
86
+ message : `[processTx] Found ${ txsToSend . length } transactions and ${
87
+ userOpsToSend . length
88
+ } userOps across ${ Object . keys ( txsByWallet ) . length } .`,
89
+ } ) ;
90
+
95
91
// 4. Send transaction batches in parallel.
96
92
const sentTxs = Object . keys ( txsByWallet ) . map ( async ( key ) => {
97
93
const txsToSend = txsByWallet [ key ] ;
@@ -118,6 +114,11 @@ export const processTx = async () => {
118
114
chainId,
119
115
address : walletAddress ,
120
116
} ) ;
117
+ logger ( {
118
+ service : "worker" ,
119
+ level : "info" ,
120
+ message : `[processTx] Got DB nonce ${ dbNonceData ?. nonce } for ${ walletAddress } .` ,
121
+ } ) ;
121
122
122
123
// For each wallet address, check the nonce in database and the mempool
123
124
const [ mempoolNonceData , gasOverrides , sentAtBlockNumber ] =
@@ -126,6 +127,11 @@ export const processTx = async () => {
126
127
getDefaultGasOverrides ( provider ) ,
127
128
provider . getBlockNumber ( ) ,
128
129
] ) ;
130
+ logger ( {
131
+ service : "worker" ,
132
+ level : "info" ,
133
+ message : `[processTx] Got onchain nonce ${ mempoolNonceData } for ${ walletAddress } .` ,
134
+ } ) ;
129
135
130
136
// - Take the larger of the nonces, and update database nonce to mempool value if mempool is greater
131
137
let startNonce : BigNumber ;
@@ -144,10 +150,15 @@ export const processTx = async () => {
144
150
startNonce = dbNonce ;
145
151
}
146
152
153
+ logger ( {
154
+ service : "worker" ,
155
+ level : "info" ,
156
+ message : `[processTx] Sending ${ txsToSend . length } transactions from wallet ${ walletAddress } .` ,
157
+ } ) ;
158
+
147
159
const rpcResponses : RpcResponseData [ ] = [ ] ;
148
160
let txIndex = 0 ;
149
161
let nonceIncrement = 0 ;
150
-
151
162
while ( txIndex < txsToSend . length ) {
152
163
const nonce = startNonce . add ( nonceIncrement ) ;
153
164
const tx = txsToSend [ txIndex ] ;
@@ -293,11 +304,18 @@ export const processTx = async () => {
293
304
}
294
305
}
295
306
307
+ const nextUnusedNonce = startNonce . add ( nonceIncrement ) . toNumber ( ) ;
296
308
await updateWalletNonce ( {
297
309
pgtx,
298
310
address : walletAddress ,
299
311
chainId,
300
- nonce : startNonce . add ( nonceIncrement ) . toNumber ( ) ,
312
+ nonce : nextUnusedNonce ,
313
+ } ) ;
314
+
315
+ logger ( {
316
+ service : "worker" ,
317
+ level : "info" ,
318
+ message : `[processTx] Updated nonce to ${ nextUnusedNonce } for ${ walletAddress } .` ,
301
319
} ) ;
302
320
303
321
// Update DB state in parallel.
@@ -516,7 +534,17 @@ export const processTx = async () => {
516
534
}
517
535
} ) ;
518
536
537
+ logger ( {
538
+ service : "worker" ,
539
+ level : "info" ,
540
+ message : `[processTx] Awaiting transactions/userOps promises...` ,
541
+ } ) ;
519
542
await Promise . all ( [ ...sentTxs , ...sentUserOps ] ) ;
543
+ logger ( {
544
+ service : "worker" ,
545
+ level : "info" ,
546
+ message : `[processTx] Transactions/userOps completed.` ,
547
+ } ) ;
520
548
} ,
521
549
{
522
550
// TODO: Should be dynamic with the batch size.
@@ -534,6 +562,8 @@ export const processTx = async () => {
534
562
error : err ,
535
563
} ) ;
536
564
}
565
+
566
+ logger ( { service : "worker" , level : "info" , message : "[processTx] Done" } ) ;
537
567
} ;
538
568
539
569
const alertOnBackendWalletLowBalance = async ( wallet : UserWallet ) => {
0 commit comments