Skip to content

Commit dbf0645

Browse files
authored
feat: Add filters to Contract Subscriptions (#543)
* feat: Add filters to Contract Subscriptions * rename * require process* bools * wip * wip * add function name, fix all typing * simplify logic a bit
1 parent f03b5ea commit dbf0645

File tree

14 files changed

+498
-270
lines changed

14 files changed

+498
-270
lines changed

src/db/contractSubscriptions/createContractSubscription.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,30 @@ interface CreateContractSubscriptionParams {
44
chainId: number;
55
contractAddress: string;
66
webhookId?: number;
7+
processEventLogs: boolean;
8+
filterEvents: string[];
9+
processTransactionReceipts: boolean;
10+
filterFunctions: string[];
711
}
812

913
export const createContractSubscription = async ({
1014
chainId,
1115
contractAddress,
1216
webhookId,
17+
processEventLogs,
18+
filterEvents,
19+
processTransactionReceipts,
20+
filterFunctions,
1321
}: CreateContractSubscriptionParams) => {
1422
return prisma.contractSubscriptions.create({
1523
data: {
1624
chainId,
1725
contractAddress,
1826
webhookId,
27+
processEventLogs,
28+
filterEvents,
29+
processTransactionReceipts,
30+
filterFunctions,
1931
},
2032
include: {
2133
webhook: true,
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- AlterTable
2+
ALTER TABLE "contract_subscriptions" ADD COLUMN "filterEventLogs" TEXT[] DEFAULT ARRAY[]::TEXT[],
3+
ADD COLUMN "parseEventLogs" BOOLEAN NOT NULL DEFAULT true,
4+
ADD COLUMN "parseTransactionReceipts" BOOLEAN NOT NULL DEFAULT true;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
Warnings:
3+
4+
- You are about to drop the column `filterEventLogs` on the `contract_subscriptions` table. All the data in the column will be lost.
5+
- You are about to drop the column `parseEventLogs` on the `contract_subscriptions` table. All the data in the column will be lost.
6+
- You are about to drop the column `parseTransactionReceipts` on the `contract_subscriptions` table. All the data in the column will be lost.
7+
8+
*/
9+
-- AlterTable
10+
ALTER TABLE "contract_subscriptions" DROP COLUMN "filterEventLogs",
11+
DROP COLUMN "parseEventLogs",
12+
DROP COLUMN "parseTransactionReceipts",
13+
ADD COLUMN "filterEvents" TEXT[] DEFAULT ARRAY[]::TEXT[],
14+
ADD COLUMN "processEventLogs" BOOLEAN NOT NULL DEFAULT true,
15+
ADD COLUMN "processTransactionReceipts" BOOLEAN NOT NULL DEFAULT true;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "contract_subscriptions" ADD COLUMN "filterFunctions" TEXT[] DEFAULT ARRAY[]::TEXT[];
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "contract_transaction_receipts" ADD COLUMN "functionName" TEXT;

src/prisma/schema.prisma

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,26 @@ generator client {
88
}
99

1010
model Configuration {
11-
id String @id @default("default") @map("id")
11+
id String @id @default("default") @map("id")
1212
// Chains
13-
chainOverrides String? @map("chainOverrides")
13+
chainOverrides String? @map("chainOverrides")
1414
// Tx Processing
15-
minTxsToProcess Int @map("minTxsToProcess")
16-
maxTxsToProcess Int @map("maxTxsToProcess")
15+
minTxsToProcess Int @map("minTxsToProcess")
16+
maxTxsToProcess Int @map("maxTxsToProcess")
1717
// Tx Updates
18-
minedTxListenerCronSchedule String? @map("minedTxsCronSchedule")
19-
maxTxsToUpdate Int @map("maxTxsToUpdate")
18+
minedTxListenerCronSchedule String? @map("minedTxsCronSchedule")
19+
maxTxsToUpdate Int @map("maxTxsToUpdate")
2020
// Tx Retries
21-
retryTxListenerCronSchedule String? @map("retryTxsCronSchedule")
22-
minEllapsedBlocksBeforeRetry Int @map("minEllapsedBlocksBeforeRetry")
23-
maxFeePerGasForRetries String @map("maxFeePerGasForRetries")
24-
maxPriorityFeePerGasForRetries String @map("maxPriorityFeePerGasForRetries")
25-
maxRetriesPerTx Int @map("maxRetriesPerTx")
21+
retryTxListenerCronSchedule String? @map("retryTxsCronSchedule")
22+
minEllapsedBlocksBeforeRetry Int @map("minEllapsedBlocksBeforeRetry")
23+
maxFeePerGasForRetries String @map("maxFeePerGasForRetries")
24+
maxPriorityFeePerGasForRetries String @map("maxPriorityFeePerGasForRetries")
25+
maxRetriesPerTx Int @map("maxRetriesPerTx")
2626
// Contract Indexer Updates
27-
indexerListenerCronSchedule String? @map("indexerListenerCronSchedule")
28-
maxBlocksToIndex Int @default(25) @map("maxBlocksToIndex")
29-
cursorDelaySeconds Int @default(2) @map("cursorDelaySeconds")
30-
contractSubscriptionsRetryDelaySeconds String @default("10") @map("contractSubscriptionsRetryDelaySeconds")
27+
indexerListenerCronSchedule String? @map("indexerListenerCronSchedule")
28+
maxBlocksToIndex Int @default(25) @map("maxBlocksToIndex")
29+
cursorDelaySeconds Int @default(2) @map("cursorDelaySeconds")
30+
contractSubscriptionsRetryDelaySeconds String @default("10") @map("contractSubscriptionsRetryDelaySeconds")
3131
3232
// AWS
3333
awsAccessKeyId String? @map("awsAccessKeyId")
@@ -188,10 +188,14 @@ model Relayers {
188188
}
189189

190190
model ContractSubscriptions {
191-
id String @id @default(uuid()) @map("id")
192-
chainId Int
193-
contractAddress String
194-
webhookId Int?
191+
id String @id @default(uuid()) @map("id")
192+
chainId Int
193+
contractAddress String
194+
webhookId Int?
195+
processEventLogs Boolean @default(true)
196+
filterEvents String[] @default([]) // empty array = no filter
197+
processTransactionReceipts Boolean @default(true)
198+
filterFunctions String[] @default([]) // empty array = no filter
195199
196200
createdAt DateTime @default(now())
197201
updatedAt DateTime @updatedAt
@@ -242,6 +246,7 @@ model ContractTransactionReceipts {
242246
blockHash String
243247
timestamp DateTime
244248
data String
249+
functionName String?
245250
246251
to String
247252
from String

src/server/routes/contract/subscriptions/addContractSubscription.ts

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,26 @@ const bodySchema = Type.Object({
2929
examples: ["https://example.com/webhook"],
3030
}),
3131
),
32+
processEventLogs: Type.Boolean({
33+
description: "If true, parse event logs for this contract.",
34+
}),
35+
filterEvents: Type.Optional(
36+
Type.Array(Type.String(), {
37+
description:
38+
"A case-sensitive list of event names to filter event logs. Parses all event logs by default.",
39+
examples: ["Transfer"],
40+
}),
41+
),
42+
processTransactionReceipts: Type.Boolean({
43+
description: "If true, parse transaction receipts for this contract.",
44+
}),
45+
filterFunctions: Type.Optional(
46+
Type.Array(Type.String(), {
47+
description:
48+
"A case-sensitive list of function names to filter transaction receipts. Parses all transaction receipts by default.",
49+
examples: ["mintTo"],
50+
}),
51+
),
3252
});
3353

3454
const responseSchema = Type.Object({
@@ -63,10 +83,26 @@ export async function addContractSubscription(fastify: FastifyInstance) {
6383
},
6484
},
6585
handler: async (request, reply) => {
66-
const { chain, contractAddress, webhookUrl } = request.body;
86+
const {
87+
chain,
88+
contractAddress,
89+
webhookUrl,
90+
processEventLogs,
91+
filterEvents = [],
92+
processTransactionReceipts,
93+
filterFunctions = [],
94+
} = request.body;
6795

6896
const chainId = await getChainIdFromChain(chain);
69-
const standardizedContractAddress = contractAddress.toLowerCase();
97+
98+
// Must parse logs or receipts.
99+
if (!processEventLogs && !processTransactionReceipts) {
100+
throw createCustomError(
101+
"Contract Subscriptions must parse event logs and/or receipts.",
102+
StatusCodes.BAD_REQUEST,
103+
"BAD_REQUEST",
104+
);
105+
}
70106

71107
// If not currently indexed, upsert the latest block number.
72108
const subscribedChainIds = await getContractSubscriptionsUniqueChainIds();
@@ -103,8 +139,12 @@ export async function addContractSubscription(fastify: FastifyInstance) {
103139
// Create the contract subscription.
104140
const contractSubscription = await createContractSubscription({
105141
chainId,
106-
contractAddress: standardizedContractAddress,
142+
contractAddress: contractAddress.toLowerCase(),
107143
webhookId,
144+
processEventLogs,
145+
filterEvents,
146+
processTransactionReceipts,
147+
filterFunctions,
108148
});
109149

110150
reply.status(StatusCodes.OK).send({

src/server/routes/system/health.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ const getFeatures = (): EngineFeature[] => {
6464
const features: EngineFeature[] = [];
6565

6666
if (env.ENABLE_KEYPAIR_AUTH) features.push("KEYPAIR_AUTH");
67-
// Contract subscriptions requires Redis.
67+
// Contract Subscriptions requires Redis.
6868
if (redis) features.push("CONTRACT_SUBSCRIPTIONS");
6969

7070
return features;

src/server/schemas/contractSubscription.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ export const contractSubscriptionSchema = Type.Object({
77
chainId: Type.Number(),
88
contractAddress: Type.String(),
99
webhook: Type.Optional(WebhookSchema),
10+
processEventLogs: Type.Boolean(),
11+
filterEvents: Type.Array(Type.String()),
12+
processTransactionReceipts: Type.Boolean(),
13+
filterFunctions: Type.Array(Type.String()),
1014
createdAt: Type.Unsafe<Date>({
1115
type: "string",
1216
format: "date",
@@ -22,5 +26,9 @@ export const toContractSubscriptionSchema = (
2226
webhook: contractSubscription.webhook
2327
? toWebhookSchema(contractSubscription.webhook)
2428
: undefined,
29+
processEventLogs: contractSubscription.processEventLogs,
30+
filterEvents: contractSubscription.filterEvents,
31+
processTransactionReceipts: contractSubscription.processTransactionReceipts,
32+
filterFunctions: contractSubscription.filterFunctions,
2533
createdAt: contractSubscription.createdAt,
2634
});

src/worker/queues/processEventLogsQueue.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Queue } from "bullmq";
22
import SuperJSON from "superjson";
3+
import { Address } from "thirdweb";
34
import { getConfig } from "../../utils/cache/getConfig";
45
import { redis } from "../../utils/redis/redis";
56
import { defaultJobOptions } from "./queues";
@@ -17,9 +18,13 @@ const _queue = redis
1718
})
1819
: null;
1920

21+
// Each job handles a block range for a given chain, filtered by addresses + events.
2022
export type EnqueueProcessEventLogsData = {
2123
chainId: number;
22-
contractAddresses: string[];
24+
filters: {
25+
address: Address;
26+
events: string[];
27+
}[];
2328
fromBlock: number; // inclusive
2429
toBlock: number; // inclusive
2530
};

0 commit comments

Comments
 (0)