Skip to content

Commit bac0a7e

Browse files
authored
feat: Move transaction webhooks to Redis (#549)
* feat: Move transaction webhooks to Redis * increase concurrency * optional resp * handle missing resp
1 parent dbf0645 commit bac0a7e

File tree

8 files changed

+160
-105
lines changed

8 files changed

+160
-105
lines changed

src/db/transactions/cleanTxs.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { Static } from "@sinclair/typebox";
33
import { transactionResponseSchema } from "../../server/schemas/transaction";
44

55
// TODO: This shouldn't need to exist with zod
6+
// @deprecated - use toTransactionSchema
67
export const cleanTxs = (
78
txs: Transactions[],
89
): Static<typeof transactionResponseSchema>[] => {

src/db/transactions/getTxByIds.ts

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,12 @@
1-
import { Static } from "@sinclair/typebox";
2-
import { PrismaTransaction } from "../../schema/prisma";
3-
import { transactionResponseSchema } from "../../server/schemas/transaction";
1+
import { Transactions } from "@prisma/client";
42
import { prisma } from "../client";
5-
import { cleanTxs } from "./cleanTxs";
6-
interface GetTxByIdsParams {
7-
queueIds: string[];
8-
pgtx?: PrismaTransaction;
9-
}
103

11-
export const getTxByIds = async ({
12-
queueIds,
13-
}: GetTxByIdsParams): Promise<
14-
Static<typeof transactionResponseSchema>[] | null
15-
> => {
16-
const tx = await prisma.transactions.findMany({
4+
export const getTransactionsByQueueIds = async (
5+
queueIds: string[],
6+
): Promise<Transactions[]> => {
7+
return await prisma.transactions.findMany({
178
where: {
18-
id: {
19-
in: queueIds,
20-
},
9+
id: { in: queueIds },
2110
},
2211
});
23-
24-
if (!tx || tx.length === 0) {
25-
return null;
26-
}
27-
28-
const cleanedTx = cleanTxs(tx);
29-
return cleanedTx;
3012
};

src/db/webhooks/getAllWebhooks.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import { prisma } from "../client";
33

44
export const getAllWebhooks = async (): Promise<Webhooks[]> => {
55
return await prisma.webhooks.findMany({
6+
where: {
7+
revokedAt: null,
8+
},
69
orderBy: {
710
id: "asc",
811
},

src/server/schemas/transaction/index.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
import { Type } from "@sinclair/typebox";
1+
import { Transactions } from "@prisma/client";
2+
import { Static, Type } from "@sinclair/typebox";
23

4+
// @TODO: rename to TransactionSchema
35
export const transactionResponseSchema = Type.Object({
46
queueId: Type.Union([
57
Type.String({
@@ -198,3 +200,23 @@ export enum TransactionStatus {
198200
// Tx was cancelled and will not be re-attempted.
199201
Cancelled = "cancelled",
200202
}
203+
204+
export const toTransactionSchema = (
205+
transaction: Transactions,
206+
): Static<typeof transactionResponseSchema> => ({
207+
...transaction,
208+
queueId: transaction.id,
209+
queuedAt: transaction.queuedAt.toISOString(),
210+
sentAt: transaction.sentAt?.toISOString() || null,
211+
minedAt: transaction.minedAt?.toISOString() || null,
212+
cancelledAt: transaction.cancelledAt?.toISOString() || null,
213+
status: transaction.errorMessage
214+
? TransactionStatus.Errored
215+
: transaction.minedAt
216+
? TransactionStatus.Mined
217+
: transaction.cancelledAt
218+
? TransactionStatus.Cancelled
219+
: transaction.sentAt
220+
? TransactionStatus.Sent
221+
: TransactionStatus.Queued,
222+
});

src/utils/cache/getWebhook.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,10 @@ export const getWebhooksByEventType = async (
1414
return webhookCache.get(cacheKey) as Webhooks[];
1515
}
1616

17-
const webhookConfig = await getAllWebhooks();
17+
const filteredWebhooks = (await getAllWebhooks()).filter(
18+
(webhook) => webhook.eventType === eventType,
19+
);
1820

19-
const eventTypeWebhookDetails = webhookConfig.filter((webhook) => {
20-
if (!webhook.revokedAt && webhook.eventType === eventType) {
21-
return webhook;
22-
}
23-
});
24-
25-
webhookCache.set(cacheKey, eventTypeWebhookDetails);
26-
return eventTypeWebhookDetails;
21+
webhookCache.set(cacheKey, filteredWebhooks);
22+
return filteredWebhooks;
2723
};

src/utils/webhook.ts

Lines changed: 20 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import { Webhooks } from "@prisma/client";
22
import crypto from "crypto";
3-
import { getTxByIds } from "../db/transactions/getTxByIds";
3+
import { getTransactionsByQueueIds } from "../db/transactions/getTxByIds";
44
import {
55
WalletBalanceWebhookSchema,
66
WebhooksEventTypes,
77
} from "../schema/webhooks";
8-
import { TransactionStatus } from "../server/schemas/transaction";
8+
import {
9+
TransactionStatus,
10+
toTransactionSchema,
11+
} from "../server/schemas/transaction";
12+
import { enqueueWebhook } from "../worker/queues/sendWebhookQueue";
913
import { getWebhooksByEventType } from "./cache/getWebhook";
1014
import { logger } from "./logger";
1115

@@ -85,59 +89,28 @@ export interface WebhookData {
8589
status: TransactionStatus;
8690
}
8791

88-
export const sendWebhooks = async (webhooks: WebhookData[]) => {
89-
const queueIds = webhooks.map((webhook) => webhook.queueId);
90-
const txs = await getTxByIds({ queueIds });
91-
if (!txs || txs.length === 0) {
92-
return;
93-
}
92+
export const sendWebhooks = async (data: WebhookData[]) => {
93+
const queueIds = data.map((d) => d.queueId);
94+
const transactions = await getTransactionsByQueueIds(queueIds);
9495

95-
const webhooksWithTxs = webhooks
96-
.map((webhook) => {
97-
const tx = txs.find((tx) => tx.queueId === webhook.queueId);
98-
return {
99-
...webhook,
100-
tx,
101-
};
102-
})
103-
.filter((webhook) => !!webhook.tx);
104-
105-
for (const webhook of webhooksWithTxs) {
106-
const webhookStatus =
107-
webhook.status === TransactionStatus.Queued
96+
for (const transaction of transactions) {
97+
const transactionResponse = toTransactionSchema(transaction);
98+
const type =
99+
transactionResponse.status === TransactionStatus.Queued
108100
? WebhooksEventTypes.QUEUED_TX
109-
: webhook.status === TransactionStatus.Sent
101+
: transactionResponse.status === TransactionStatus.Sent
110102
? WebhooksEventTypes.SENT_TX
111-
: webhook.status === TransactionStatus.Mined
103+
: transactionResponse.status === TransactionStatus.Mined
112104
? WebhooksEventTypes.MINED_TX
113-
: webhook.status === TransactionStatus.Errored
105+
: transactionResponse.status === TransactionStatus.Errored
114106
? WebhooksEventTypes.ERRORED_TX
115-
: webhook.status === TransactionStatus.Cancelled
107+
: transactionResponse.status === TransactionStatus.Cancelled
116108
? WebhooksEventTypes.CANCELLED_TX
117109
: undefined;
118110

119-
const webhookConfigs = await Promise.all([
120-
...((await getWebhooksByEventType(WebhooksEventTypes.ALL_TX)) || []),
121-
...(webhookStatus ? await getWebhooksByEventType(webhookStatus) : []),
122-
]);
123-
124-
await Promise.all(
125-
webhookConfigs.map(async (webhookConfig) => {
126-
if (webhookConfig.revokedAt) {
127-
logger({
128-
service: "server",
129-
level: "debug",
130-
message: "No webhook set or active, skipping webhook send",
131-
});
132-
return;
133-
}
134-
135-
await sendWebhookRequest(
136-
webhookConfig,
137-
webhook.tx as Record<string, any>,
138-
);
139-
}),
140-
);
111+
if (type) {
112+
await enqueueWebhook({ type, transaction });
113+
}
141114
}
142115
};
143116

src/worker/queues/sendWebhookQueue.ts

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import {
22
ContractEventLogs,
33
ContractTransactionReceipts,
4+
Transactions,
45
Webhooks,
56
} from "@prisma/client";
67
import { Queue } from "bullmq";
78
import SuperJSON from "superjson";
89
import { WebhooksEventTypes } from "../../schema/webhooks";
10+
import { getWebhooksByEventType } from "../../utils/cache/getWebhook";
911
import { logger } from "../../utils/logger";
1012
import { redis } from "../../utils/redis/redis";
1113
import { defaultJobOptions } from "./queues";
@@ -26,8 +28,22 @@ export type EnqueueContractSubscriptionWebhookData = {
2628
eventLog?: ContractEventLogs;
2729
transactionReceipt?: ContractTransactionReceipts;
2830
};
31+
32+
export type EnqueueTransactionWebhookData = {
33+
type:
34+
| WebhooksEventTypes.ALL_TX
35+
| WebhooksEventTypes.QUEUED_TX
36+
| WebhooksEventTypes.SENT_TX
37+
| WebhooksEventTypes.MINED_TX
38+
| WebhooksEventTypes.ERRORED_TX
39+
| WebhooksEventTypes.CANCELLED_TX;
40+
transaction: Transactions;
41+
};
42+
2943
// TODO: Add other webhook event types here.
30-
type EnqueueWebhookData = EnqueueContractSubscriptionWebhookData;
44+
type EnqueueWebhookData =
45+
| EnqueueContractSubscriptionWebhookData
46+
| EnqueueTransactionWebhookData;
3147

3248
export interface WebhookJob {
3349
data: EnqueueWebhookData;
@@ -38,15 +54,26 @@ export const enqueueWebhook = async (data: EnqueueWebhookData) => {
3854
switch (data.type) {
3955
case WebhooksEventTypes.CONTRACT_SUBSCRIPTION:
4056
return enqueueContractSubscriptionWebhook(data);
57+
case WebhooksEventTypes.ALL_TX:
58+
case WebhooksEventTypes.QUEUED_TX:
59+
case WebhooksEventTypes.SENT_TX:
60+
case WebhooksEventTypes.MINED_TX:
61+
case WebhooksEventTypes.ERRORED_TX:
62+
case WebhooksEventTypes.CANCELLED_TX:
63+
return enqueueTransactionWebhook(data);
4164
default:
4265
logger({
4366
service: "worker",
4467
level: "warn",
45-
message: `Unexpected webhook type: ${data.type}`,
68+
message: `Unexpected webhook type: ${(data as any).type}`,
4669
});
4770
}
4871
};
4972

73+
/**
74+
* Contract Subscriptions webhooks
75+
*/
76+
5077
const enqueueContractSubscriptionWebhook = async (
5178
data: EnqueueContractSubscriptionWebhookData,
5279
) => {
@@ -88,3 +115,36 @@ const getContractSubscriptionWebhookIdempotencyKey = (args: {
88115
}
89116
throw 'Must provide "eventLog" or "transactionReceipt".';
90117
};
118+
119+
/**
120+
* Transaction webhooks
121+
*/
122+
123+
const enqueueTransactionWebhook = async (
124+
data: EnqueueTransactionWebhookData,
125+
) => {
126+
if (!_queue) return;
127+
128+
const webhooks = [
129+
...(await getWebhooksByEventType(WebhooksEventTypes.ALL_TX)),
130+
...(await getWebhooksByEventType(data.type)),
131+
];
132+
133+
for (const webhook of webhooks) {
134+
const job: WebhookJob = { data, webhook };
135+
const serialized = SuperJSON.stringify(job);
136+
await _queue.add(`${data.type}:${webhook.id}`, serialized, {
137+
jobId: getTransactionWebhookIdempotencyKey({
138+
webhook,
139+
eventType: data.type,
140+
queueId: data.transaction.id,
141+
}),
142+
});
143+
}
144+
};
145+
146+
const getTransactionWebhookIdempotencyKey = (args: {
147+
webhook: Webhooks;
148+
eventType: WebhooksEventTypes;
149+
queueId: string;
150+
}) => `${args.webhook.url}:${args.eventType}:${args.queueId}`;

src/worker/tasks/sendWebhookWorker.ts

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import { Static } from "@sinclair/typebox";
12
import { Job, Processor, Worker } from "bullmq";
23
import superjson from "superjson";
34
import { WebhooksEventTypes } from "../../schema/webhooks";
45
import { toEventLogSchema } from "../../server/schemas/eventLog";
6+
import {
7+
toTransactionSchema,
8+
transactionResponseSchema,
9+
} from "../../server/schemas/transaction";
510
import { toTransactionReceiptSchema } from "../../server/schemas/transactionReceipt";
611
import { redis } from "../../utils/redis/redis";
712
import { WebhookResponse, sendWebhookRequest } from "../../utils/webhook";
@@ -11,33 +16,46 @@ import {
1116
WebhookJob,
1217
} from "../queues/sendWebhookQueue";
1318

14-
interface WebhookBody {
15-
type: "event-log" | "transaction-receipt";
16-
data: any;
17-
}
18-
1919
const handler: Processor<any, void, string> = async (job: Job<string>) => {
2020
const { data, webhook } = superjson.parse<WebhookJob>(job.data);
2121

2222
let resp: WebhookResponse | undefined;
23-
if (data.type === WebhooksEventTypes.CONTRACT_SUBSCRIPTION) {
24-
let webhookBody: WebhookBody;
25-
if (data.eventLog) {
26-
webhookBody = {
27-
type: "event-log",
28-
data: toEventLogSchema(data.eventLog),
23+
switch (data.type) {
24+
case WebhooksEventTypes.CONTRACT_SUBSCRIPTION: {
25+
let webhookBody: {
26+
type: "event-log" | "transaction-receipt";
27+
data: any;
2928
};
30-
} else if (data.transactionReceipt) {
31-
webhookBody = {
32-
type: "transaction-receipt",
33-
data: toTransactionReceiptSchema(data.transactionReceipt),
34-
};
35-
} else {
36-
throw new Error(
37-
'Missing "eventLog" or "transactionReceipt" for CONTRACT_SUBSCRIPTION webhook.',
38-
);
29+
if (data.eventLog) {
30+
webhookBody = {
31+
type: "event-log",
32+
data: toEventLogSchema(data.eventLog),
33+
};
34+
} else if (data.transactionReceipt) {
35+
webhookBody = {
36+
type: "transaction-receipt",
37+
data: toTransactionReceiptSchema(data.transactionReceipt),
38+
};
39+
} else {
40+
throw new Error(
41+
'Missing "eventLog" or "transactionReceipt" for CONTRACT_SUBSCRIPTION webhook.',
42+
);
43+
}
44+
resp = await sendWebhookRequest(webhook, webhookBody);
45+
break;
46+
}
47+
48+
case WebhooksEventTypes.ALL_TX:
49+
case WebhooksEventTypes.QUEUED_TX:
50+
case WebhooksEventTypes.SENT_TX:
51+
case WebhooksEventTypes.MINED_TX:
52+
case WebhooksEventTypes.ERRORED_TX:
53+
case WebhooksEventTypes.CANCELLED_TX: {
54+
const webhookBody: Static<typeof transactionResponseSchema> =
55+
toTransactionSchema(data.transaction);
56+
resp = await sendWebhookRequest(webhook, webhookBody);
57+
break;
3958
}
40-
resp = await sendWebhookRequest(webhook, webhookBody);
4159
}
4260

4361
if (resp && !resp.ok) {
@@ -52,7 +70,7 @@ const handler: Processor<any, void, string> = async (job: Job<string>) => {
5270
let _worker: Worker | null = null;
5371
if (redis) {
5472
_worker = new Worker(SEND_WEBHOOK_QUEUE_NAME, handler, {
55-
concurrency: 1,
73+
concurrency: 10,
5674
connection: redis,
5775
});
5876
logWorkerEvents(_worker);

0 commit comments

Comments
 (0)