Skip to content

Commit 7f11d20

Browse files
committed
wip
1 parent 9acff55 commit 7f11d20

File tree

9 files changed

+160
-84
lines changed

9 files changed

+160
-84
lines changed

src/db/contractSubscriptions/createContractSubscription.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ interface CreateContractSubscriptionParams {
77
processEventLogs: boolean;
88
filterEvents: string[];
99
processTransactionReceipts: boolean;
10+
filterFunctions: string[];
1011
}
1112

1213
export const createContractSubscription = async ({
@@ -16,6 +17,7 @@ export const createContractSubscription = async ({
1617
processEventLogs,
1718
filterEvents,
1819
processTransactionReceipts,
20+
filterFunctions,
1921
}: CreateContractSubscriptionParams) => {
2022
return prisma.contractSubscriptions.create({
2123
data: {
@@ -25,6 +27,7 @@ export const createContractSubscription = async ({
2527
processEventLogs,
2628
filterEvents,
2729
processTransactionReceipts,
30+
filterFunctions,
2831
},
2932
include: {
3033
webhook: true,
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "contract_subscriptions" ADD COLUMN "filterFunctions" TEXT[] DEFAULT ARRAY[]::TEXT[];

src/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ model ContractSubscriptions {
195195
processEventLogs Boolean @default(true)
196196
filterEvents String[] @default([]) // empty array = no filter
197197
processTransactionReceipts Boolean @default(true)
198+
filterFunctions String[] @default([]) // empty array = no filter
198199
199200
createdAt DateTime @default(now())
200201
updatedAt DateTime @updatedAt

src/server/routes/contract/subscriptions/addContractSubscription.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,18 @@ const bodySchema = Type.Object({
3535
filterEvents: Type.Optional(
3636
Type.Array(Type.String(), {
3737
description:
38-
"A case-sensitive list of event log names to parse. If empty, parse all event logs.",
38+
"A case-sensitive list of event names to filter event logs. Parses all event logs by default.",
3939
}),
4040
),
4141
processTransactionReceipts: Type.Boolean({
4242
description: "If true, parse transaction receipts for this contract.",
4343
}),
44+
filterFunctions: Type.Optional(
45+
Type.Array(Type.String(), {
46+
description:
47+
"A case-sensitive list of function names to filter transaction receipts. Parses all transaction receipts by default.",
48+
}),
49+
),
4450
});
4551

4652
const responseSchema = Type.Object({
@@ -82,6 +88,7 @@ export async function addContractSubscription(fastify: FastifyInstance) {
8288
processEventLogs,
8389
filterEvents = [],
8490
processTransactionReceipts,
91+
filterFunctions = [],
8592
} = request.body;
8693

8794
const chainId = await getChainIdFromChain(chain);
@@ -136,6 +143,7 @@ export async function addContractSubscription(fastify: FastifyInstance) {
136143
processEventLogs,
137144
filterEvents,
138145
processTransactionReceipts,
146+
filterFunctions,
139147
});
140148

141149
reply.status(StatusCodes.OK).send({

src/server/schemas/contractSubscription.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export const contractSubscriptionSchema = Type.Object({
1010
processEventLogs: Type.Boolean(),
1111
filterEvents: Type.Array(Type.String()),
1212
processTransactionReceipts: Type.Boolean(),
13+
filterFunctions: Type.Array(Type.String()),
1314
createdAt: Type.Unsafe<Date>({
1415
type: "string",
1516
format: "date",
@@ -28,5 +29,6 @@ export const toContractSubscriptionSchema = (
2829
processEventLogs: contractSubscription.processEventLogs,
2930
filterEvents: contractSubscription.filterEvents,
3031
processTransactionReceipts: contractSubscription.processTransactionReceipts,
32+
filterFunctions: contractSubscription.filterFunctions,
3133
createdAt: contractSubscription.createdAt,
3234
});

src/worker/queues/processTransactionReceiptsQueue.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ const _queue = redis
2020

2121
export type EnqueueProcessTransactionReceiptsData = {
2222
chainId: number;
23-
filters: { address: string }[];
23+
filters: {
24+
address: string;
25+
functions: string[];
26+
}[];
2427
fromBlock: number; // inclusive
2528
toBlock: number; // inclusive
2629
};

src/worker/tasks/chainIndexer.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,13 @@ export const createChainIndexerTask = async (args: {
8181
});
8282
}
8383

84-
// Identify contract addresses to parse transaction receipts, if any.
84+
// Identify addresses + function names to parse transaction receipts, if any.
8585
const transactionReceiptFilters: { address: string }[] =
8686
contractSubscriptions
87-
.filter((cs) => cs.processTransactionReceipts)
88-
.map((cs) => ({
89-
address: cs.contractAddress.toLowerCase(),
87+
.filter((c) => c.processTransactionReceipts)
88+
.map((c) => ({
89+
address: c.contractAddress.toLowerCase(),
90+
functions: c.filterFunctions,
9091
}));
9192
if (transactionReceiptFilters.length > 0) {
9293
await enqueueProcessTransactionReceipts({

src/worker/tasks/processEventLogsWorker.ts

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,20 @@ export const getWebhooksByContractAddresses = async (
103103

104104
type GetLogsParams = EnqueueProcessEventLogsData;
105105

106-
const getLogs = async (
107-
params: GetLogsParams,
108-
): Promise<Prisma.ContractEventLogsCreateInput[]> => {
109-
const chain = defineChain(params.chainId);
106+
const getLogs = async ({
107+
chainId,
108+
fromBlock,
109+
toBlock,
110+
filters,
111+
}: GetLogsParams): Promise<Prisma.ContractEventLogsCreateInput[]> => {
112+
if (filters.length === 0) {
113+
return [];
114+
}
115+
116+
const chain = defineChain(chainId);
110117

111118
// Get events for each contract address. Apply any filters.
112-
const promises = params.filters.map(async (f) => {
119+
const promises = filters.map(async (f) => {
113120
const contract = getContract({
114121
client: thirdwebClient,
115122
chain,
@@ -128,12 +135,10 @@ const getLogs = async (
128135
}
129136
}
130137

131-
// TODO: add filter for function name.
132-
133138
return await getContractEvents({
134139
contract,
135-
fromBlock: BigInt(params.fromBlock),
136-
toBlock: BigInt(params.toBlock),
140+
fromBlock: BigInt(fromBlock),
141+
toBlock: BigInt(toBlock),
137142
events,
138143
});
139144
});
@@ -146,7 +151,7 @@ const getLogs = async (
146151
// Transform logs into the DB schema.
147152
return allEvents.map(
148153
(event): Prisma.ContractEventLogsCreateInput => ({
149-
chainId: params.chainId,
154+
chainId,
150155
blockNumber: Number(event.blockNumber),
151156
contractAddress: event.address.toLowerCase(),
152157
transactionHash: event.transactionHash,
@@ -156,15 +161,21 @@ const getLogs = async (
156161
topic3: event.topics[3],
157162
data: event.data,
158163
eventName: event.eventName,
159-
decodedLog: event.args,
164+
decodedLog: event.args ?? {},
160165
timestamp: blockTimestamps[event.blockHash],
161166
transactionIndex: event.transactionIndex,
162167
logIndex: event.logIndex,
163168
}),
164169
);
165170
};
166171

167-
export const getBlockTimestamps = async (
172+
/**
173+
* Gets the timestamps for a list of block hashes. Falls back to the current time.
174+
* @param chain
175+
* @param blockHashes
176+
* @returns Record<Hash, Date>
177+
*/
178+
const getBlockTimestamps = async (
168179
chain: Chain,
169180
blockHashes: Hash[],
170181
): Promise<Record<Hash, Date>> => {
@@ -180,7 +191,7 @@ export const getBlockTimestamps = async (
180191
dedupe.map(async (blockHash) => {
181192
try {
182193
const block = await eth_getBlockByHash(rpcRequest, { blockHash });
183-
return new Date(Number(block.timestamp * 1000n));
194+
return new Date(Number(block.timestamp) * 1000);
184195
} catch (e) {
185196
return now;
186197
}

0 commit comments

Comments
 (0)