Skip to content

Commit 1c9aead

Browse files
committed
feat: Add filters to Contract Subscriptions
1 parent 3325fad commit 1c9aead

File tree

12 files changed

+223
-172
lines changed

12 files changed

+223
-172
lines changed

src/db/contractSubscriptions/createContractSubscription.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,27 @@ interface CreateContractSubscriptionParams {
44
chainId: number;
55
contractAddress: string;
66
webhookId?: number;
7+
parseEventLogs: boolean;
8+
filterEventLogs: string[];
9+
parseTransactionReceipts: boolean;
710
}
811

912
export const createContractSubscription = async ({
1013
chainId,
1114
contractAddress,
1215
webhookId,
16+
parseEventLogs,
17+
filterEventLogs,
18+
parseTransactionReceipts,
1319
}: CreateContractSubscriptionParams) => {
1420
return prisma.contractSubscriptions.create({
1521
data: {
1622
chainId,
1723
contractAddress,
1824
webhookId,
25+
parseEventLogs,
26+
filterEventLogs,
27+
parseTransactionReceipts,
1928
},
2029
include: {
2130
webhook: true,
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- AlterTable
2+
ALTER TABLE "contract_subscriptions" ADD COLUMN "filterEventLogs" TEXT[] DEFAULT ARRAY[]::TEXT[],
3+
ADD COLUMN "parseEventLogs" BOOLEAN NOT NULL DEFAULT true,
4+
ADD COLUMN "parseTransactionReceipts" BOOLEAN NOT NULL DEFAULT true;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
Warnings:
3+
4+
- You are about to drop the column `filterEventLogs` on the `contract_subscriptions` table. All the data in the column will be lost.
5+
- You are about to drop the column `parseEventLogs` on the `contract_subscriptions` table. All the data in the column will be lost.
6+
- You are about to drop the column `parseTransactionReceipts` on the `contract_subscriptions` table. All the data in the column will be lost.
7+
8+
*/
9+
-- AlterTable
10+
ALTER TABLE "contract_subscriptions" DROP COLUMN "filterEventLogs",
11+
DROP COLUMN "parseEventLogs",
12+
DROP COLUMN "parseTransactionReceipts",
13+
ADD COLUMN "filterEvents" TEXT[] DEFAULT ARRAY[]::TEXT[],
14+
ADD COLUMN "processEventLogs" BOOLEAN NOT NULL DEFAULT true,
15+
ADD COLUMN "processTransactionReceipts" BOOLEAN NOT NULL DEFAULT true;

src/prisma/schema.prisma

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,26 @@ generator client {
88
}
99

1010
model Configuration {
11-
id String @id @default("default") @map("id")
11+
id String @id @default("default") @map("id")
1212
// Chains
13-
chainOverrides String? @map("chainOverrides")
13+
chainOverrides String? @map("chainOverrides")
1414
// Tx Processing
15-
minTxsToProcess Int @map("minTxsToProcess")
16-
maxTxsToProcess Int @map("maxTxsToProcess")
15+
minTxsToProcess Int @map("minTxsToProcess")
16+
maxTxsToProcess Int @map("maxTxsToProcess")
1717
// Tx Updates
18-
minedTxListenerCronSchedule String? @map("minedTxsCronSchedule")
19-
maxTxsToUpdate Int @map("maxTxsToUpdate")
18+
minedTxListenerCronSchedule String? @map("minedTxsCronSchedule")
19+
maxTxsToUpdate Int @map("maxTxsToUpdate")
2020
// Tx Retries
21-
retryTxListenerCronSchedule String? @map("retryTxsCronSchedule")
22-
minEllapsedBlocksBeforeRetry Int @map("minEllapsedBlocksBeforeRetry")
23-
maxFeePerGasForRetries String @map("maxFeePerGasForRetries")
24-
maxPriorityFeePerGasForRetries String @map("maxPriorityFeePerGasForRetries")
25-
maxRetriesPerTx Int @map("maxRetriesPerTx")
21+
retryTxListenerCronSchedule String? @map("retryTxsCronSchedule")
22+
minEllapsedBlocksBeforeRetry Int @map("minEllapsedBlocksBeforeRetry")
23+
maxFeePerGasForRetries String @map("maxFeePerGasForRetries")
24+
maxPriorityFeePerGasForRetries String @map("maxPriorityFeePerGasForRetries")
25+
maxRetriesPerTx Int @map("maxRetriesPerTx")
2626
// Contract Indexer Updates
27-
indexerListenerCronSchedule String? @map("indexerListenerCronSchedule")
28-
maxBlocksToIndex Int @default(25) @map("maxBlocksToIndex")
29-
cursorDelaySeconds Int @default(2) @map("cursorDelaySeconds")
30-
contractSubscriptionsRetryDelaySeconds String @default("10") @map("contractSubscriptionsRetryDelaySeconds")
27+
indexerListenerCronSchedule String? @map("indexerListenerCronSchedule")
28+
maxBlocksToIndex Int @default(25) @map("maxBlocksToIndex")
29+
cursorDelaySeconds Int @default(2) @map("cursorDelaySeconds")
30+
contractSubscriptionsRetryDelaySeconds String @default("10") @map("contractSubscriptionsRetryDelaySeconds")
3131
3232
// AWS
3333
awsAccessKeyId String? @map("awsAccessKeyId")
@@ -188,10 +188,13 @@ model Relayers {
188188
}
189189

190190
model ContractSubscriptions {
191-
id String @id @default(uuid()) @map("id")
192-
chainId Int
193-
contractAddress String
194-
webhookId Int?
191+
id String @id @default(uuid()) @map("id")
192+
chainId Int
193+
contractAddress String
194+
webhookId Int?
195+
processEventLogs Boolean @default(true)
196+
filterEvents String[] @default([]) // empty array = no filter
197+
processTransactionReceipts Boolean @default(true)
195198
196199
createdAt DateTime @default(now())
197200
updatedAt DateTime @updatedAt

src/server/routes/contract/subscriptions/addContractSubscription.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,22 @@ const bodySchema = Type.Object({
2929
examples: ["https://example.com/webhook"],
3030
}),
3131
),
32+
parseEventLogs: Type.Optional(
33+
Type.Boolean({
34+
description: "If true, parse event logs for this contract.",
35+
}),
36+
),
37+
filterEventLogs: Type.Optional(
38+
Type.Array(Type.String(), {
39+
description:
40+
"A case-sensitive list of event log names to parse. If empty, parse all event logs.",
41+
}),
42+
),
43+
parseTransactionReceipts: Type.Optional(
44+
Type.Boolean({
45+
description: "If true, parse transaction receipts for this contract.",
46+
}),
47+
),
3248
});
3349

3450
const responseSchema = Type.Object({
@@ -63,11 +79,27 @@ export async function addContractSubscription(fastify: FastifyInstance) {
6379
},
6480
},
6581
handler: async (request, reply) => {
66-
const { chain, contractAddress, webhookUrl } = request.body;
82+
const {
83+
chain,
84+
contractAddress,
85+
webhookUrl,
86+
parseEventLogs = true,
87+
filterEventLogs = [],
88+
parseTransactionReceipts = true,
89+
} = request.body;
6790

6891
const chainId = await getChainIdFromChain(chain);
6992
const standardizedContractAddress = contractAddress.toLowerCase();
7093

94+
// Must parse logs or receipts.
95+
if (!parseEventLogs && !parseTransactionReceipts) {
96+
throw createCustomError(
97+
"Contract Subscriptions must parse event logs and/or receipts.",
98+
StatusCodes.BAD_REQUEST,
99+
"BAD_REQUEST",
100+
);
101+
}
102+
71103
// If not currently indexed, upsert the latest block number.
72104
const subscribedChainIds = await getContractSubscriptionsUniqueChainIds();
73105
if (!subscribedChainIds.includes(chainId)) {
@@ -105,6 +137,9 @@ export async function addContractSubscription(fastify: FastifyInstance) {
105137
chainId,
106138
contractAddress: standardizedContractAddress,
107139
webhookId,
140+
parseEventLogs,
141+
filterEventLogs,
142+
parseTransactionReceipts,
108143
});
109144

110145
reply.status(StatusCodes.OK).send({

src/server/routes/system/health.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ const getFeatures = (): EngineFeature[] => {
6262
const features: EngineFeature[] = [];
6363

6464
if (env.ENABLE_KEYPAIR_AUTH) features.push("KEYPAIR_AUTH");
65-
// Contract subscriptions requires Redis.
65+
// Contract Subscriptions requires Redis.
6666
if (redis) features.push("CONTRACT_SUBSCRIPTIONS");
6767

6868
return features;

src/server/schemas/contractSubscription.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ export const contractSubscriptionSchema = Type.Object({
77
chainId: Type.Number(),
88
contractAddress: Type.String(),
99
webhook: Type.Optional(WebhookSchema),
10+
parseEventLogs: Type.Boolean(),
11+
filterEventLogs: Type.Array(Type.String()),
12+
parseTransactionReceipts: Type.Boolean(),
1013
createdAt: Type.Unsafe<Date>({
1114
type: "string",
1215
format: "date",
@@ -22,5 +25,8 @@ export const toContractSubscriptionSchema = (
2225
webhook: contractSubscription.webhook
2326
? toWebhookSchema(contractSubscription.webhook)
2427
: undefined,
28+
parseEventLogs: contractSubscription.parseEventLogs,
29+
filterEventLogs: contractSubscription.filterEventLogs,
30+
parseTransactionReceipts: contractSubscription.parseTransactionReceipts,
2531
createdAt: contractSubscription.createdAt,
2632
});

src/worker/queues/processEventLogsQueue.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ const _queue = redis
1919

2020
export type EnqueueProcessEventLogsData = {
2121
chainId: number;
22-
contractAddresses: string[];
22+
filters: {
23+
address: string;
24+
events: string[];
25+
functions: string[];
26+
}[];
2327
fromBlock: number; // inclusive
2428
toBlock: number; // inclusive
2529
};

src/worker/queues/processTransactionReceiptsQueue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ const _queue = redis
2020

2121
export type EnqueueProcessTransactionReceiptsData = {
2222
chainId: number;
23-
contractAddresses: string[];
23+
filters: { address: string }[];
2424
fromBlock: number; // inclusive
2525
toBlock: number; // inclusive
2626
};

src/worker/tasks/chainIndexer.ts

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export const createChainIndexerTask = async (args: {
3232
const currentBlockNumber =
3333
(await provider.getBlockNumber()) - toBlockOffset;
3434

35-
// Limit toBlock to avoid hitting rate or file size limits when querying logs.
35+
// Limit toBlock to avoid hitting rate or block range limits when querying logs.
3636
const toBlock = Math.min(
3737
currentBlockNumber,
3838
fromBlock + maxBlocksToIndex,
@@ -43,11 +43,9 @@ export const createChainIndexerTask = async (args: {
4343
return;
4444
}
4545

46-
// Ensuring that the block data exists.
47-
// Sometimes the RPC providers nodes are aware of the latest block
48-
// but the block data is not available yet.
46+
// Ensure that the block data exists.
47+
// Sometimes the RPC nodes do not yet return data for the latest block.
4948
const block = await provider.getBlockWithTransactions(toBlock);
50-
5149
if (!block) {
5250
logger({
5351
service: "worker",
@@ -59,32 +57,47 @@ export const createChainIndexerTask = async (args: {
5957
return;
6058
}
6159

62-
// Get contract addresses to filter event logs and transaction receipts by.
6360
const contractSubscriptions = await getContractSubscriptionsByChainId(
6461
chainId,
6562
true,
6663
);
67-
const contractAddresses = [
68-
...new Set<string>(
69-
contractSubscriptions.map(
70-
(subscription) => subscription.contractAddress,
71-
),
72-
),
73-
];
7464

75-
await enqueueProcessEventLogs({
76-
chainId,
77-
fromBlock,
78-
toBlock,
79-
contractAddresses,
80-
});
65+
// Identify contract addresses + event names to parse event logs, if any.
66+
const eventLogFilters: {
67+
address: string;
68+
events: string[];
69+
functions: string[];
70+
}[] = contractSubscriptions
71+
.filter((c) => c.parseEventLogs)
72+
.map((c) => ({
73+
address: c.contractAddress,
74+
events: c.filterEventNames,
75+
functions: c.filterFunctionNames,
76+
}));
77+
if (eventLogFilters.length > 0) {
78+
await enqueueProcessEventLogs({
79+
chainId,
80+
fromBlock,
81+
toBlock,
82+
filters: eventLogFilters,
83+
});
84+
}
8185

82-
await enqueueProcessTransactionReceipts({
83-
chainId,
84-
fromBlock,
85-
toBlock,
86-
contractAddresses,
87-
});
86+
// Identify contract addresses to parse transaction receipts, if any.
87+
const transactionReceiptFilters: { address: string }[] =
88+
contractSubscriptions
89+
.filter((cs) => cs.parseTransactionReceipts)
90+
.map((cs) => ({
91+
address: cs.contractAddress.toLowerCase(),
92+
}));
93+
if (transactionReceiptFilters.length > 0) {
94+
await enqueueProcessTransactionReceipts({
95+
chainId,
96+
fromBlock,
97+
toBlock,
98+
filters: transactionReceiptFilters,
99+
});
100+
}
88101

89102
// Update the latest block number.
90103
try {
@@ -103,7 +116,7 @@ export const createChainIndexerTask = async (args: {
103116
}
104117
},
105118
{
106-
timeout: 5 * 60000, // 3 minutes timeout
119+
timeout: 60 * 1000, // 1 minute timeout
107120
},
108121
);
109122
} catch (err: any) {

0 commit comments

Comments
 (0)