Skip to content

Commit 5479cdf

Browse files
authored
Update webhook status notifications & prevent duplicates (#369)
* Add try/catch to webhook request * Add manual webhook status * Fix repeat webhooks * Update
1 parent d2a989e commit 5479cdf

File tree

6 files changed

+117
-78
lines changed

6 files changed

+117
-78
lines changed

.vscode/settings.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"editor.codeActionsOnSave": {
3-
"source.organizeImports": true,
4-
"source.eslint.fixAll": true
3+
"source.organizeImports": "explicit",
4+
"source.eslint.fixAll": "explicit"
55
},
66
"editor.defaultFormatter": "esbenp.prettier-vscode",
77
"editor.formatOnSave": true,

src/server/utils/webhook.ts

Lines changed: 58 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -82,66 +82,67 @@ export const sendWebhookRequest = async (
8282
}
8383
};
8484

85-
export const sendTxWebhook = async (queueIds: string[]): Promise<void> => {
86-
try {
87-
const txDataByIds = await getTxByIds({ queueIds });
88-
if (!txDataByIds || txDataByIds.length === 0) {
89-
return;
90-
}
91-
for (const txData of txDataByIds!) {
92-
if (!txData) {
93-
return;
94-
} else {
95-
let webhookConfig: SanitizedWebHooksSchema[] | undefined =
96-
await getWebhook(WebhooksEventTypes.ALL_TX);
97-
98-
if (!webhookConfig) {
99-
switch (txData.status) {
100-
case TransactionStatusEnum.Queued:
101-
webhookConfig = await getWebhook(WebhooksEventTypes.QUEUED_TX);
102-
break;
103-
case TransactionStatusEnum.Submitted:
104-
webhookConfig = await getWebhook(WebhooksEventTypes.SENT_TX);
105-
break;
106-
case TransactionStatusEnum.Retried:
107-
webhookConfig = await getWebhook(WebhooksEventTypes.RETRIED_TX);
108-
break;
109-
case TransactionStatusEnum.Mined:
110-
webhookConfig = await getWebhook(WebhooksEventTypes.MINED_TX);
111-
break;
112-
case TransactionStatusEnum.Errored:
113-
webhookConfig = await getWebhook(WebhooksEventTypes.ERRORED_TX);
114-
break;
115-
case TransactionStatusEnum.Cancelled:
116-
webhookConfig = await getWebhook(WebhooksEventTypes.ERRORED_TX);
117-
break;
118-
}
119-
}
120-
121-
await Promise.all(
122-
webhookConfig?.map(async (config) => {
123-
if (!config || !config?.active) {
124-
logger({
125-
service: "server",
126-
level: "debug",
127-
message: "No webhook set or active, skipping webhook send",
128-
});
85+
export interface WebhookData {
86+
queueId: string;
87+
status: TransactionStatusEnum;
88+
}
89+
90+
export const sendWebhooks = async (webhooks: WebhookData[]) => {
91+
const queueIds = webhooks.map((webhook) => webhook.queueId);
92+
const txs = await getTxByIds({ queueIds });
93+
if (!txs || txs.length === 0) {
94+
return;
95+
}
12996

130-
return;
131-
}
97+
const webhooksWithTxs = webhooks
98+
.map((webhook) => {
99+
const tx = txs.find((tx) => tx.queueId === webhook.queueId);
100+
return {
101+
...webhook,
102+
tx,
103+
};
104+
})
105+
.filter((webhook) => !!webhook.tx);
106+
107+
for (const webhook of webhooksWithTxs) {
108+
const webhookStatus =
109+
webhook.status === TransactionStatusEnum.Queued
110+
? WebhooksEventTypes.QUEUED_TX
111+
: webhook.status === TransactionStatusEnum.Submitted
112+
? WebhooksEventTypes.SENT_TX
113+
: webhook.status === TransactionStatusEnum.Retried
114+
? WebhooksEventTypes.RETRIED_TX
115+
: webhook.status === TransactionStatusEnum.Mined
116+
? WebhooksEventTypes.MINED_TX
117+
: webhook.status === TransactionStatusEnum.Errored
118+
? WebhooksEventTypes.ERRORED_TX
119+
: webhook.status === TransactionStatusEnum.Cancelled
120+
? WebhooksEventTypes.CANCELLED_TX
121+
: undefined;
122+
123+
const webhookConfigs = await Promise.all([
124+
...((await getWebhook(WebhooksEventTypes.ALL_TX)) || []),
125+
...(webhookStatus ? (await getWebhook(webhookStatus)) || [] : []),
126+
]);
127+
128+
await Promise.all(
129+
webhookConfigs.map(async (webhookConfig) => {
130+
if (!webhookConfig.active) {
131+
logger({
132+
service: "server",
133+
level: "debug",
134+
message: "No webhook set or active, skipping webhook send",
135+
});
136+
137+
return;
138+
}
132139

133-
await sendWebhookRequest(config, txData);
134-
}) || [],
140+
await sendWebhookRequest(
141+
webhookConfig,
142+
webhook.tx as Record<string, any>,
135143
);
136-
}
137-
}
138-
} catch (error) {
139-
logger({
140-
service: "server",
141-
level: "error",
142-
message: `Failed to send webhook`,
143-
error,
144-
});
144+
}),
145+
);
145146
}
146147
};
147148

src/worker/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { WebhooksEventTypes } from "../schema/webhooks";
2+
import { getWebhook } from "../utils/cache/getWebhook";
13
import {
24
newConfigurationListener,
35
updatedConfigurationListener,
@@ -35,6 +37,10 @@ const worker = async () => {
3537
// Listen for new & updated webhooks data
3638
await newWebhooksListener();
3739
await updatedWebhooksListener();
40+
41+
for (const eventType of Object.values(WebhooksEventTypes)) {
42+
await getWebhook(eventType, false);
43+
}
3844
};
3945

4046
worker();

src/worker/tasks/processTx.ts

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ import {
1818
TransactionStatusEnum,
1919
transactionResponseSchema,
2020
} from "../../server/schemas/transaction";
21-
import { sendBalanceWebhook, sendTxWebhook } from "../../server/utils/webhook";
21+
import {
22+
WebhookData,
23+
sendBalanceWebhook,
24+
sendWebhooks,
25+
} from "../../server/utils/webhook";
2226
import { getConfig } from "../../utils/cache/getConfig";
2327
import { getSdk } from "../../utils/cache/getSdk";
2428
import { env } from "../../utils/env";
@@ -51,7 +55,7 @@ type RpcResponseData = {
5155
export const processTx = async () => {
5256
try {
5357
// 0. Initialize queueIds to send webhook
54-
const sendWebhookForQueueIds: string[] = [];
58+
const sendWebhookForQueueIds: WebhookData[] = [];
5559
await prisma.$transaction(
5660
async (pgtx) => {
5761
// 1. Select a batch of transactions and lock the rows so no other workers pick them up
@@ -68,7 +72,12 @@ export const processTx = async () => {
6872
return;
6973
}
7074
// Send Queued Webhook
71-
await sendTxWebhook(txs.map((tx) => tx.queueId!));
75+
await sendWebhooks(
76+
txs.map((tx) => ({
77+
queueId: tx.queueId!,
78+
status: TransactionStatusEnum.Queued,
79+
})),
80+
);
7281

7382
// 2. Iterate through all filtering cancelled trandsactions, and sorting transactions and user operations
7483
const txsToSend = [];
@@ -314,7 +323,10 @@ export const processTx = async () => {
314323
res: rpcResponse,
315324
sentAt: new Date(),
316325
});
317-
sendWebhookForQueueIds.push(tx.queueId!);
326+
sendWebhookForQueueIds.push({
327+
queueId: tx.queueId!,
328+
status: TransactionStatusEnum.Submitted,
329+
});
318330
} else if (
319331
typeof rpcResponse.error?.message === "string" &&
320332
(rpcResponse.error.message as string)
@@ -333,12 +345,18 @@ export const processTx = async () => {
333345
res: rpcResponse,
334346
sentAt: new Date(),
335347
});
336-
sendWebhookForQueueIds.push(tx.queueId!);
348+
sendWebhookForQueueIds.push({
349+
queueId: tx.queueId!,
350+
status: TransactionStatusEnum.Errored,
351+
});
337352
}
338353
} catch (err: any) {
339354
// Error (continue to next transaction)
340355
txIndex++;
341-
sendWebhookForQueueIds.push(tx.queueId!);
356+
sendWebhookForQueueIds.push({
357+
queueId: tx.queueId!,
358+
status: TransactionStatusEnum.Errored,
359+
});
342360

343361
logger({
344362
service: "worker",
@@ -442,7 +460,6 @@ export const processTx = async () => {
442460
});
443461
break;
444462
}
445-
sendWebhookForQueueIds.push(tx.queueId!);
446463
}),
447464
);
448465
} catch (err: any) {
@@ -506,7 +523,10 @@ export const processTx = async () => {
506523
userOpHash,
507524
},
508525
});
509-
sendWebhookForQueueIds.push(tx.queueId!);
526+
sendWebhookForQueueIds.push({
527+
queueId: tx.queueId!,
528+
status: TransactionStatusEnum.UserOpSent,
529+
});
510530
} catch (err: any) {
511531
logger({
512532
service: "worker",
@@ -527,7 +547,10 @@ export const processTx = async () => {
527547
`Failed to handle transaction`,
528548
},
529549
});
530-
sendWebhookForQueueIds.push(tx.queueId!);
550+
sendWebhookForQueueIds.push({
551+
queueId: tx.queueId!,
552+
status: TransactionStatusEnum.Errored,
553+
});
531554
}
532555
});
533556

@@ -540,7 +563,7 @@ export const processTx = async () => {
540563
},
541564
);
542565

543-
await sendTxWebhook(sendWebhookForQueueIds);
566+
await sendWebhooks(sendWebhookForQueueIds);
544567
} catch (err: any) {
545568
logger({
546569
service: "worker",

src/worker/tasks/updateMinedTx.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ import { prisma } from "../../db/client";
55
import { getSentTxs } from "../../db/transactions/getSentTxs";
66
import { updateTx } from "../../db/transactions/updateTx";
77
import { TransactionStatusEnum } from "../../server/schemas/transaction";
8-
import { sendTxWebhook } from "../../server/utils/webhook";
8+
import { WebhookData, sendWebhooks } from "../../server/utils/webhook";
99
import { getSdk } from "../../utils/cache/getSdk";
1010
import { logger } from "../../utils/logger";
1111

1212
export const updateMinedTx = async () => {
1313
try {
14-
const sendWebhookForQueueIds: string[] = [];
14+
const sendWebhookForQueueIds: WebhookData[] = [];
1515
await prisma.$transaction(
1616
async (pgtx) => {
1717
const txs = await getSentTxs({ pgtx });
@@ -107,7 +107,10 @@ export const updateMinedTx = async () => {
107107
message: "Updated mined tx.",
108108
});
109109

110-
sendWebhookForQueueIds.push(txWithReceipt.tx.id);
110+
sendWebhookForQueueIds.push({
111+
queueId: txWithReceipt.tx.id,
112+
status: TransactionStatusEnum.Mined,
113+
});
111114
}),
112115
);
113116

@@ -130,7 +133,10 @@ export const updateMinedTx = async () => {
130133
message: "Update dropped tx.",
131134
});
132135

133-
sendWebhookForQueueIds.push(tx.id);
136+
sendWebhookForQueueIds.push({
137+
queueId: tx.id,
138+
status: TransactionStatusEnum.Errored,
139+
});
134140
}),
135141
);
136142
},
@@ -139,7 +145,7 @@ export const updateMinedTx = async () => {
139145
},
140146
);
141147

142-
await sendTxWebhook(sendWebhookForQueueIds);
148+
await sendWebhooks(sendWebhookForQueueIds);
143149
} catch (err) {
144150
logger({
145151
service: "worker",

src/worker/tasks/updateMinedUserOps.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import { prisma } from "../../db/client";
44
import { getSentUserOps } from "../../db/transactions/getSentUserOps";
55
import { updateTx } from "../../db/transactions/updateTx";
66
import { TransactionStatusEnum } from "../../server/schemas/transaction";
7-
import { sendTxWebhook } from "../../server/utils/webhook";
7+
import { WebhookData, sendWebhooks } from "../../server/utils/webhook";
88
import { getSdk } from "../../utils/cache/getSdk";
99
import { logger } from "../../utils/logger";
1010

1111
export const updateMinedUserOps = async () => {
1212
try {
13-
const sendWebhookForQueueIds: string[] = [];
13+
const sendWebhookForQueueIds: WebhookData[] = [];
1414
await prisma.$transaction(
1515
async (pgtx) => {
1616
const userOps = await getSentUserOps({ pgtx });
@@ -97,7 +97,10 @@ export const updateMinedUserOps = async () => {
9797
queueId: userOp!.id,
9898
message: `Updated with receipt`,
9999
});
100-
sendWebhookForQueueIds.push(userOp!.id);
100+
sendWebhookForQueueIds.push({
101+
queueId: userOp!.id,
102+
status: TransactionStatusEnum.Mined,
103+
});
101104
}),
102105
);
103106
},
@@ -106,7 +109,7 @@ export const updateMinedUserOps = async () => {
106109
},
107110
);
108111

109-
await sendTxWebhook(sendWebhookForQueueIds);
112+
await sendWebhooks(sendWebhookForQueueIds);
110113
} catch (err) {
111114
logger({
112115
service: "worker",

0 commit comments

Comments
 (0)