Skip to content

Commit 1809ea6

Browse files
arcoravenfarhanW3
andauthored
feat: Add webhooks to contract subscriptions Redis workers (#521)
* feat: Add webhooks to contract subscriptions Redis workers * updates for logs & worker * CRUD endpoints * make Redis backward compatible * add processEventLogs worker * Added contract-subscriptions config to DB * Contract Subs Event Missing updates * updated endpoint check * assert valid numbers * move webhooks query above db call --------- Co-authored-by: farhanW3 <farhan@thirdweb.com>
1 parent f22d72d commit 1809ea6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1667
-960
lines changed

docker-compose-infra.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,29 @@ services:
2020
cpus: "2"
2121
memory: 2G
2222

23+
redis:
24+
container_name: redis
25+
image: redis:latest
26+
restart: always
27+
ports:
28+
- 6379:6379
29+
volumes:
30+
- redis_data:/data
31+
32+
bullboard:
33+
container_name: bullboard
34+
image: deadly0/bull-board:latest
35+
restart: always
36+
ports:
37+
- 3000:3000
38+
environment:
39+
REDIS_HOST: redis
40+
REDIS_PORT: 6379
41+
REDIS_USE_TLS: "false"
42+
BULL_PREFIX: bull
43+
depends_on:
44+
- redis
45+
2346
volumes:
2447
psql_db:
48+
redis_data:

package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"@fastify/type-provider-typebox": "^3.2.0",
3737
"@fastify/websocket": "^8.2.0",
3838
"@google-cloud/kms": "^4.0.0",
39-
"@prisma/client": "5.2.0",
39+
"@prisma/client": "^5.14.0-dev.65",
4040
"@sinclair/typebox": "^0.31.28",
4141
"@t3-oss/env-core": "^0.6.0",
4242
"@thirdweb-dev/auth": "^4.1.55",
@@ -47,6 +47,7 @@
4747
"@types/base-64": "^1.0.2",
4848
"base-64": "^1.0.0",
4949
"body-parser": "^1.20.2",
50+
"bullmq": "^5.7.8",
5051
"cookie": "^0.5.0",
5152
"cookie-parser": "^1.4.6",
5253
"copyfiles": "^2.4.1",
@@ -59,14 +60,16 @@
5960
"fastify": "^4.15.0",
6061
"fastify-plugin": "^4.5.0",
6162
"http-status-codes": "^2.2.0",
63+
"ioredis": "^5.4.1",
6264
"jsonwebtoken": "^9.0.2",
6365
"knex": "^3.1.0",
6466
"mnemonist": "^0.39.8",
6567
"node-cron": "^3.0.2",
6668
"pg": "^8.11.3",
6769
"pino": "^8.15.1",
6870
"pino-pretty": "^10.0.0",
69-
"prisma": "^5.2.0",
71+
"prisma": "5.14.0-dev.61",
72+
"superjson": "^2.2.1",
7073
"thirdweb": "^5.1.0",
7174
"uuid": "^9.0.1",
7275
"viem": "^1.14.0",

src/db/contractEventLogs/createContractEventLogs.ts

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,18 @@
1+
import { ContractEventLogs, Prisma } from "@prisma/client";
12
import { PrismaTransaction } from "../../schema/prisma";
23
import { getPrismaWithPostgresTx } from "../client";
34

4-
interface ContractEventLogEntry {
5-
chainId: number;
6-
blockNumber: number;
7-
contractAddress: string;
8-
transactionHash: string;
9-
topic0?: string;
10-
topic1?: string;
11-
topic2?: string;
12-
topic3?: string;
13-
data: string;
14-
decodedLog?: any; // Assuming JSON object for decodedLog
15-
eventName?: string;
16-
timestamp: Date;
17-
transactionIndex: number;
18-
logIndex: number;
19-
}
20-
215
export interface BulkInsertContractLogsParams {
226
pgtx?: PrismaTransaction;
23-
logs: ContractEventLogEntry[];
7+
logs: Prisma.ContractEventLogsCreateInput[];
248
}
259

2610
export const bulkInsertContractEventLogs = async ({
2711
pgtx,
2812
logs,
29-
}: BulkInsertContractLogsParams) => {
13+
}: BulkInsertContractLogsParams): Promise<ContractEventLogs[]> => {
3014
const prisma = getPrismaWithPostgresTx(pgtx);
31-
return await prisma.contractEventLogs.createMany({
15+
return await prisma.contractEventLogs.createManyAndReturn({
3216
data: logs,
3317
skipDuplicates: true,
3418
});

src/db/contractSubscriptions/createContractSubscription.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,22 @@ import { prisma } from "../client";
33
interface CreateContractSubscriptionParams {
44
chainId: number;
55
contractAddress: string;
6+
webhookId?: number;
67
}
78

89
export const createContractSubscription = async ({
910
chainId,
1011
contractAddress,
12+
webhookId,
1113
}: CreateContractSubscriptionParams) => {
1214
return prisma.contractSubscriptions.create({
13-
data: { chainId, contractAddress },
15+
data: {
16+
chainId,
17+
contractAddress,
18+
webhookId,
19+
},
20+
include: {
21+
webhook: true,
22+
},
1423
});
1524
};

src/db/contractSubscriptions/deleteContractSubscription.ts

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,8 @@
11
import { prisma } from "../client";
22

3-
interface RemoveContractSubscriptionParams {
4-
chainId: number;
5-
contractAddress: string;
6-
}
7-
8-
export const deleteContractSubscription = async ({
9-
chainId,
10-
contractAddress,
11-
}: RemoveContractSubscriptionParams) => {
12-
return prisma.contractSubscriptions.updateMany({
13-
where: {
14-
chainId,
15-
contractAddress,
16-
deletedAt: null,
17-
},
3+
export const deleteContractSubscription = async (id: string) => {
4+
return prisma.contractSubscriptions.update({
5+
where: { id },
186
data: {
197
deletedAt: new Date(),
208
},

src/db/contractSubscriptions/getContractSubscriptions.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,28 @@ export const isContractSubscribed = async ({
99
chainId,
1010
contractAddress,
1111
}: GetContractSubscriptionsParams) => {
12-
const subscribedContract = await prisma.contractSubscriptions.findMany({
12+
const contractSubscription = await prisma.contractSubscriptions.findFirst({
1313
where: {
1414
chainId,
1515
contractAddress,
1616
deletedAt: null,
1717
},
1818
});
19-
20-
if (subscribedContract.length === 0) return false;
21-
else return true;
19+
return contractSubscription !== null;
2220
};
2321

24-
export const getContractSubscriptionsByChainId = async (chainId: number) => {
22+
export const getContractSubscriptionsByChainId = async (
23+
chainId: number,
24+
includeWebhook = false,
25+
) => {
2526
return await prisma.contractSubscriptions.findMany({
2627
where: {
2728
chainId,
2829
deletedAt: null,
2930
},
31+
include: {
32+
webhook: includeWebhook,
33+
},
3034
});
3135
};
3236

@@ -35,6 +39,9 @@ export const getAllContractSubscriptions = async () => {
3539
where: {
3640
deletedAt: null,
3741
},
42+
include: {
43+
webhook: true,
44+
},
3845
});
3946
};
4047

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,19 @@
1+
import { ContractTransactionReceipts, Prisma } from "@prisma/client";
12
import { PrismaTransaction } from "../../schema/prisma";
23
import { getPrismaWithPostgresTx } from "../client";
34

4-
interface ContractTransactionReceiptEntry {
5-
chainId: number;
6-
blockNumber: number;
7-
contractId: string;
8-
contractAddress: string;
9-
transactionHash: string;
10-
blockHash: string;
11-
timestamp: Date;
12-
to: string;
13-
from: string;
14-
transactionIndex: number;
15-
value: string;
16-
data: string;
17-
gasUsed: string;
18-
effectiveGasPrice: string;
19-
status: number;
20-
}
21-
225
export interface BulkInsertContractLogsParams {
236
pgtx?: PrismaTransaction;
24-
txReceipts: ContractTransactionReceiptEntry[];
7+
receipts: Prisma.ContractTransactionReceiptsCreateInput[];
258
}
269

2710
export const bulkInsertContractTransactionReceipts = async ({
2811
pgtx,
29-
txReceipts,
30-
}: BulkInsertContractLogsParams) => {
12+
receipts,
13+
}: BulkInsertContractLogsParams): Promise<ContractTransactionReceipts[]> => {
3114
const prisma = getPrismaWithPostgresTx(pgtx);
32-
return await prisma.contractTransactionReceipts.createMany({
33-
data: txReceipts,
15+
return await prisma.contractTransactionReceipts.createManyAndReturn({
16+
data: receipts,
3417
skipDuplicates: true,
3518
});
3619
};

src/db/webhooks/createWebhook.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Webhooks } from "@prisma/client";
12
import { createHash, randomBytes } from "crypto";
23
import { WebhooksEventTypes } from "../../schema/webhooks";
34
import { prisma } from "../client";
@@ -12,7 +13,7 @@ export const insertWebhook = async ({
1213
url,
1314
name,
1415
eventType,
15-
}: CreateWebhooksParams) => {
16+
}: CreateWebhooksParams): Promise<Webhooks> => {
1617
// generate random bytes
1718
const bytes = randomBytes(4096);
1819
// hash the bytes to create the secret (this will not be stored by itself)

src/db/webhooks/getAllWebhooks.ts

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,10 @@
11
import { Webhooks } from "@prisma/client";
2-
import { SanitizedWebHooksSchema } from "../../schema/webhooks";
32
import { prisma } from "../client";
43

5-
export const getAllWebhooks = async (): Promise<SanitizedWebHooksSchema[]> => {
6-
let webhooks = await prisma.webhooks.findMany({
4+
export const getAllWebhooks = async (): Promise<Webhooks[]> => {
5+
return await prisma.webhooks.findMany({
76
orderBy: {
87
id: "asc",
98
},
109
});
11-
12-
return sanitizeData(webhooks);
13-
};
14-
15-
const sanitizeData = (data: Webhooks[]): SanitizedWebHooksSchema[] => {
16-
return data.map((webhook) => {
17-
return {
18-
url: webhook.url,
19-
name: webhook.name,
20-
eventType: webhook.eventType,
21-
secret: webhook.secret ? webhook.secret : undefined,
22-
createdAt: webhook.createdAt.toISOString(),
23-
active: webhook.revokedAt ? false : true,
24-
id: webhook.id,
25-
};
26-
});
2710
};

src/db/webhooks/getWebhook.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { prisma } from "../client";
2+
3+
export const getWebhook = async (id: number) => {
4+
return await prisma.webhooks.findUnique({
5+
where: { id },
6+
});
7+
};

src/db/webhooks/revokeWebhook.ts

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,13 @@
1-
import { StatusCodes } from "http-status-codes";
2-
import { createCustomError } from "../../server/middleware/error";
31
import { prisma } from "../client";
42

5-
interface RevokeWebhooksParams {
6-
id: number;
7-
}
8-
9-
export const markWebhookAsRevoked = async ({ id }: RevokeWebhooksParams) => {
10-
const currentTimestamp = new Date();
11-
12-
const exists = await prisma.webhooks.findUnique({
13-
where: {
14-
id,
15-
},
16-
});
17-
18-
if (!exists)
19-
throw createCustomError(
20-
`Webhook with id ${id} does not exist`,
21-
StatusCodes.BAD_REQUEST,
22-
"BAD_REQUEST",
23-
);
3+
export const deleteWebhook = async (id: number) => {
4+
const now = new Date();
245

256
return prisma.webhooks.update({
26-
where: {
27-
id,
28-
},
7+
where: { id },
298
data: {
30-
revokedAt: currentTimestamp,
31-
updatedAt: currentTimestamp,
9+
revokedAt: now,
10+
updatedAt: now,
3211
},
3312
});
3413
};
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- AlterTable
2+
ALTER TABLE "contract_subscriptions" ADD COLUMN "webhookId" INTEGER;
3+
4+
-- AddForeignKey
5+
ALTER TABLE "contract_subscriptions" ADD CONSTRAINT "contract_subscriptions_webhookId_fkey" FOREIGN KEY ("webhookId") REFERENCES "webhooks"("id") ON DELETE CASCADE ON UPDATE CASCADE;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- DropForeignKey
2+
ALTER TABLE "contract_subscriptions" DROP CONSTRAINT "contract_subscriptions_webhookId_fkey";
3+
4+
-- AddForeignKey
5+
ALTER TABLE "contract_subscriptions" ADD CONSTRAINT "contract_subscriptions_webhookId_fkey" FOREIGN KEY ("webhookId") REFERENCES "webhooks"("id") ON DELETE SET NULL ON UPDATE CASCADE;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "configuration" ADD COLUMN "contractSubscriptionsRetryDelaySeconds" TEXT NOT NULL DEFAULT '10';

src/prisma/schema.prisma

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ model Configuration {
2727
indexerListenerCronSchedule String? @map("indexerListenerCronSchedule")
2828
maxBlocksToIndex Int @default(25) @map("maxBlocksToIndex")
2929
cursorDelaySeconds Int @default(2) @map("cursorDelaySeconds")
30+
contractSubscriptionsRetryDelaySeconds String @default("10") @map("contractSubscriptionsRetryDelaySeconds")
3031
3132
// AWS
3233
awsAccessKeyId String? @map("awsAccessKeyId")
@@ -162,14 +163,15 @@ model Transactions {
162163
}
163164

164165
model Webhooks {
165-
id Int @id @default(autoincrement()) @map("id")
166-
name String? @map("name")
167-
url String @map("url")
168-
secret String @map("secret")
169-
eventType String @map("evenType")
170-
createdAt DateTime @default(now()) @map("createdAt")
171-
updatedAt DateTime @updatedAt @map("updatedAt")
172-
revokedAt DateTime? @map("revokedAt")
166+
id Int @id @default(autoincrement()) @map("id")
167+
name String? @map("name")
168+
url String @map("url")
169+
secret String @map("secret")
170+
eventType String @map("evenType")
171+
createdAt DateTime @default(now()) @map("createdAt")
172+
updatedAt DateTime @updatedAt @map("updatedAt")
173+
revokedAt DateTime? @map("revokedAt")
174+
ContractSubscriptions ContractSubscriptions[]
173175
174176
@@map("webhooks")
175177
}
@@ -189,11 +191,14 @@ model ContractSubscriptions {
189191
id String @id @default(uuid()) @map("id")
190192
chainId Int
191193
contractAddress String
194+
webhookId Int?
192195
193196
createdAt DateTime @default(now())
194197
updatedAt DateTime @updatedAt
195198
deletedAt DateTime?
196199
200+
webhook Webhooks? @relation(fields: [webhookId], references: [id], onDelete: SetNull)
201+
197202
// optimize distinct lookups
198203
@@index([chainId])
199204
@@map("contract_subscriptions")

0 commit comments

Comments
 (0)