Skip to content

Commit 9b00006

Browse files
committed
add function name, fix all typing
1 parent e35c097 commit 9b00006

File tree

8 files changed

+248
-154
lines changed

8 files changed

+248
-154
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "contract_transaction_receipts" ADD COLUMN "functionName" TEXT;

src/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ model ContractTransactionReceipts {
246246
blockHash String
247247
timestamp DateTime
248248
data String
249+
functionName String?
249250
250251
to String
251252
from String

src/server/routes/contract/subscriptions/addContractSubscription.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const bodySchema = Type.Object({
3636
Type.Array(Type.String(), {
3737
description:
3838
"A case-sensitive list of event names to filter event logs. Parses all event logs by default.",
39+
examples: ["Transfer"],
3940
}),
4041
),
4142
processTransactionReceipts: Type.Boolean({
@@ -45,6 +46,7 @@ const bodySchema = Type.Object({
4546
Type.Array(Type.String(), {
4647
description:
4748
"A case-sensitive list of function names to filter transaction receipts. Parses all transaction receipts by default.",
49+
examples: ["mintTo"],
4850
}),
4951
),
5052
});
@@ -92,7 +94,6 @@ export async function addContractSubscription(fastify: FastifyInstance) {
9294
} = request.body;
9395

9496
const chainId = await getChainIdFromChain(chain);
95-
const standardizedContractAddress = contractAddress.toLowerCase();
9697

9798
// Must parse logs or receipts.
9899
if (!processEventLogs && !processTransactionReceipts) {
@@ -138,7 +139,7 @@ export async function addContractSubscription(fastify: FastifyInstance) {
138139
// Create the contract subscription.
139140
const contractSubscription = await createContractSubscription({
140141
chainId,
141-
contractAddress: standardizedContractAddress,
142+
contractAddress: contractAddress.toLowerCase(),
142143
webhookId,
143144
processEventLogs,
144145
filterEvents,

src/worker/queues/processEventLogsQueue.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Queue } from "bullmq";
22
import SuperJSON from "superjson";
3+
import { Address } from "thirdweb";
34
import { getConfig } from "../../utils/cache/getConfig";
45
import { redis } from "../../utils/redis/redis";
56
import { defaultJobOptions } from "./queues";
@@ -17,10 +18,11 @@ const _queue = redis
1718
})
1819
: null;
1920

21+
// Each job handles a block range for a given chain, filtered by addresses + events.
2022
export type EnqueueProcessEventLogsData = {
2123
chainId: number;
2224
filters: {
23-
address: string;
25+
address: Address;
2426
events: string[];
2527
}[];
2628
fromBlock: number; // inclusive

src/worker/queues/processTransactionReceiptsQueue.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Queue } from "bullmq";
22
import SuperJSON from "superjson";
3+
import { Address } from "thirdweb";
34
import { getConfig } from "../../utils/cache/getConfig";
45
import { redis } from "../../utils/redis/redis";
56
import { defaultJobOptions } from "./queues";
@@ -18,10 +19,11 @@ const _queue = redis
1819
})
1920
: null;
2021

22+
// Each job handles a block range for a given chain, filtered by addresses + events.
2123
export type EnqueueProcessTransactionReceiptsData = {
2224
chainId: number;
2325
filters: {
24-
address: string;
26+
address: Address;
2527
functions: string[];
2628
}[];
2729
fromBlock: number; // inclusive

src/worker/tasks/chainIndexer.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { StaticJsonRpcBatchProvider } from "@thirdweb-dev/sdk";
2+
import { Address } from "thirdweb";
23
import { getBlockForIndexing } from "../../db/chainIndexers/getChainIndexer";
34
import { upsertChainIndexer } from "../../db/chainIndexers/upsertChainIndexer";
45
import { prisma } from "../../db/client";
@@ -64,12 +65,12 @@ export const createChainIndexerTask = async (args: {
6465

6566
// Identify contract addresses + event names to parse event logs, if any.
6667
const eventLogFilters: {
67-
address: string;
68+
address: Address;
6869
events: string[];
6970
}[] = contractSubscriptions
7071
.filter((c) => c.processEventLogs)
7172
.map((c) => ({
72-
address: c.contractAddress,
73+
address: c.contractAddress as Address,
7374
events: c.filterEvents,
7475
}));
7576
if (eventLogFilters.length > 0) {
@@ -82,13 +83,15 @@ export const createChainIndexerTask = async (args: {
8283
}
8384

8485
// Identify addresses + function names to parse transaction receipts, if any.
85-
const transactionReceiptFilters: { address: string }[] =
86-
contractSubscriptions
87-
.filter((c) => c.processTransactionReceipts)
88-
.map((c) => ({
89-
address: c.contractAddress.toLowerCase(),
90-
functions: c.filterFunctions,
91-
}));
86+
const transactionReceiptFilters: {
87+
address: Address;
88+
functions: string[];
89+
}[] = contractSubscriptions
90+
.filter((c) => c.processTransactionReceipts)
91+
.map((c) => ({
92+
address: c.contractAddress as Address,
93+
functions: c.filterFunctions,
94+
}));
9295
if (transactionReceiptFilters.length > 0) {
9396
await enqueueProcessTransactionReceipts({
9497
chainId,

src/worker/tasks/processEventLogsWorker.ts

Lines changed: 99 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ import { AbiEvent } from "abitype";
33
import { Job, Processor, Worker } from "bullmq";
44
import superjson from "superjson";
55
import {
6+
Address,
67
Chain,
78
PreparedEvent,
9+
ThirdwebContract,
810
defineChain,
911
eth_getBlockByHash,
1012
getContract,
@@ -48,7 +50,6 @@ const handler: Processor<any, void, string> = async (job: Job<string>) => {
4850

4951
// Store logs to DB.
5052
const insertedLogs = await bulkInsertContractEventLogs({ logs });
51-
5253
if (insertedLogs.length === 0) {
5354
return;
5455
}
@@ -103,6 +104,10 @@ export const getWebhooksByContractAddresses = async (
103104

104105
type GetLogsParams = EnqueueProcessEventLogsData;
105106

107+
/**
108+
* Gets all event logs for the subscribed addresses and filters.
109+
* @returns A list of logs to insert to the ContractEventLogs table.
110+
*/
106111
const getLogs = async ({
107112
chainId,
108113
fromBlock,
@@ -114,22 +119,32 @@ const getLogs = async ({
114119
}
115120

116121
const chain = defineChain(chainId);
122+
const addressConfig: Record<
123+
Address,
124+
{
125+
contract: ThirdwebContract;
126+
}
127+
> = {};
128+
for (const filter of filters) {
129+
addressConfig[filter.address] = {
130+
contract: getContract({
131+
client: thirdwebClient,
132+
chain,
133+
address: filter.address,
134+
}),
135+
};
136+
}
117137

118138
// Get events for each contract address. Apply any filters.
119139
const promises = filters.map(async (f) => {
120-
const contract = getContract({
121-
client: thirdwebClient,
122-
chain,
123-
address: f.address,
124-
});
140+
const { contract } = addressConfig[f.address];
125141

126142
// Get events to filter by, if any.
127143
const events: PreparedEvent<AbiEvent>[] = [];
128-
if (f.events.length) {
144+
if (f.events.length > 0) {
129145
const abi = await resolveContractAbi<AbiEvent[]>(contract);
130-
for (const event of f.events) {
131-
const signature = abi.find((a) => a.name === event);
132-
if (signature) {
146+
for (const signature of abi) {
147+
if (f.events.includes(signature.name)) {
133148
events.push(prepareEvent({ signature }));
134149
}
135150
}
@@ -142,33 +157,88 @@ const getLogs = async ({
142157
events,
143158
});
144159
});
160+
145161
// Query and flatten all events.
146-
const allEvents = (await Promise.all(promises)).flat();
162+
const allLogs = (await Promise.all(promises)).flat();
147163

148-
const blockHashes = allEvents.map((e) => e.blockHash);
164+
// Get timestamps for blocks.
165+
const blockHashes = allLogs.map((e) => e.blockHash);
149166
const blockTimestamps = await getBlockTimestamps(chain, blockHashes);
150167

151168
// Transform logs into the DB schema.
152-
return allEvents.map(
153-
(event): Prisma.ContractEventLogsCreateInput => ({
154-
chainId,
155-
blockNumber: Number(event.blockNumber),
156-
contractAddress: event.address.toLowerCase(),
157-
transactionHash: event.transactionHash,
158-
topic0: event.topics[0],
159-
topic1: event.topics[1],
160-
topic2: event.topics[2],
161-
topic3: event.topics[3],
162-
data: event.data,
163-
eventName: event.eventName,
164-
decodedLog: event.args ?? {},
165-
timestamp: blockTimestamps[event.blockHash],
166-
transactionIndex: event.transactionIndex,
167-
logIndex: event.logIndex,
168-
}),
169+
return await Promise.all(
170+
allLogs.map(
171+
async (log): Promise<Prisma.ContractEventLogsCreateInput> => ({
172+
chainId,
173+
blockNumber: Number(log.blockNumber),
174+
contractAddress: log.address.toLowerCase(),
175+
transactionHash: log.transactionHash,
176+
topic0: log.topics[0],
177+
topic1: log.topics[1],
178+
topic2: log.topics[2],
179+
topic3: log.topics[3],
180+
data: log.data,
181+
eventName: log.eventName,
182+
decodedLog: await formatDecodedLog({
183+
contract:
184+
addressConfig[log.address.toLowerCase() as Address].contract,
185+
eventName: log.eventName,
186+
logArgs: log.args as Record<string, unknown>,
187+
}),
188+
timestamp: blockTimestamps[log.blockHash],
189+
transactionIndex: log.transactionIndex,
190+
logIndex: log.logIndex,
191+
}),
192+
),
169193
);
170194
};
171195

196+
/**
197+
* Transform v5 SDK to v4 log format.
198+
*
199+
* Example input:
200+
* {
201+
* "to": "0x123...",
202+
* "quantity": 2n
203+
* }
204+
*
205+
* Example output:
206+
* {
207+
* "to": {
208+
* "type:" "address",
209+
* "value": "0x123..."
210+
* },
211+
* "quantity": {
212+
* "type:" "uint256",
213+
* "value": "2"
214+
* },
215+
* }
216+
*/
217+
const formatDecodedLog = async (args: {
218+
contract: ThirdwebContract;
219+
eventName: string;
220+
logArgs: Record<string, unknown>;
221+
}): Promise<Record<string, Prisma.InputJsonObject> | undefined> => {
222+
const { contract, eventName, logArgs } = args;
223+
224+
const abi = await resolveContractAbi<AbiEvent[]>(contract);
225+
const eventSignature = abi.find((a) => a.name === eventName);
226+
if (!eventSignature) {
227+
return;
228+
}
229+
230+
const res: Record<string, Prisma.InputJsonObject> = {};
231+
for (const { name, type } of eventSignature.inputs) {
232+
if (name && name in logArgs) {
233+
res[name] = {
234+
type,
235+
value: (logArgs[name] as any).toString(),
236+
};
237+
}
238+
}
239+
return res;
240+
};
241+
172242
/**
173243
* Gets the timestamps for a list of block hashes. Falls back to the current time.
174244
* @param chain

0 commit comments

Comments
 (0)