Skip to content

Commit 9acff55

Browse files
committed
wip
1 parent 93da8d0 commit 9acff55

File tree

4 files changed

+43
-32
lines changed

4 files changed

+43
-32
lines changed

src/worker/queues/processEventLogsQueue.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ export type EnqueueProcessEventLogsData = {
2222
filters: {
2323
address: string;
2424
events: string[];
25-
functions: string[];
2625
}[];
2726
fromBlock: number; // inclusive
2827
toBlock: number; // inclusive

src/worker/tasks/chainIndexer.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,11 @@ export const createChainIndexerTask = async (args: {
6666
const eventLogFilters: {
6767
address: string;
6868
events: string[];
69-
functions: string[];
7069
}[] = contractSubscriptions
7170
.filter((c) => c.processEventLogs)
7271
.map((c) => ({
7372
address: c.contractAddress,
7473
events: c.filterEvents,
75-
functions: [],
7674
}));
7775
if (eventLogFilters.length > 0) {
7876
await enqueueProcessEventLogs({

src/worker/tasks/processEventLogsWorker.ts

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { Prisma, Webhooks } from "@prisma/client";
2+
import { AbiEvent } from "abitype";
23
import { Job, Processor, Worker } from "bullmq";
34
import superjson from "superjson";
45
import {
6+
Chain,
57
PreparedEvent,
68
defineChain,
79
eth_getBlockByHash,
@@ -11,6 +13,7 @@ import {
1113
prepareEvent,
1214
} from "thirdweb";
1315
import { resolveContractAbi } from "thirdweb/contract";
16+
import { Hash } from "viem";
1417
import { bulkInsertContractEventLogs } from "../../db/contractEventLogs/createContractEventLogs";
1518
import { getContractSubscriptionsByChainId } from "../../db/contractSubscriptions/getContractSubscriptions";
1619
import { WebhooksEventTypes } from "../../schema/webhooks";
@@ -104,23 +107,19 @@ const getLogs = async (
104107
params: GetLogsParams,
105108
): Promise<Prisma.ContractEventLogsCreateInput[]> => {
106109
const chain = defineChain(params.chainId);
107-
const rpcRequest = getRpcClient({
108-
client: thirdwebClient,
109-
chain,
110-
});
111110

112111
// Get events for each contract address. Apply any filters.
113112
const promises = params.filters.map(async (f) => {
114-
const contract = await getContract({
113+
const contract = getContract({
115114
client: thirdwebClient,
116115
chain,
117116
address: f.address,
118117
});
119118

120119
// Get events to filter by, if any.
121-
const events: PreparedEvent[] = [];
120+
const events: PreparedEvent<AbiEvent>[] = [];
122121
if (f.events.length) {
123-
const abi = await resolveContractAbi(contract);
122+
const abi = await resolveContractAbi<AbiEvent[]>(contract);
124123
for (const event of f.events) {
125124
const signature = abi.find((a) => a.name === event);
126125
if (signature) {
@@ -141,13 +140,12 @@ const getLogs = async (
141140
// Query and flatten all events.
142141
const allEvents = (await Promise.all(promises)).flat();
143142

144-
// Transform logs into the DB schema.
145-
const formattedLogs: Prisma.ContractEventLogsCreateInput[] = [];
146-
for (const event of allEvents) {
147-
// This makes an RPC call, but it should be cached in SDK when querying the same block hash.
148-
const timestamp = await getBlockTimestamp(rpcRequest, event.blockHash);
143+
const blockHashes = allEvents.map((e) => e.blockHash);
144+
const blockTimestamps = await getBlockTimestamps(chain, blockHashes);
149145

150-
formattedLogs.push({
146+
// Transform logs into the DB schema.
147+
return allEvents.map(
148+
(event): Prisma.ContractEventLogsCreateInput => ({
151149
chainId: params.chainId,
152150
blockNumber: Number(event.blockNumber),
153151
contractAddress: event.address.toLowerCase(),
@@ -159,25 +157,41 @@ const getLogs = async (
159157
data: event.data,
160158
eventName: event.eventName,
161159
decodedLog: event.args,
162-
timestamp,
160+
timestamp: blockTimestamps[event.blockHash],
163161
transactionIndex: event.transactionIndex,
164162
logIndex: event.logIndex,
165-
});
166-
}
167-
168-
return formattedLogs;
163+
}),
164+
);
169165
};
170166

171-
const getBlockTimestamp = async (
172-
rpcRequest: ReturnType<typeof getRpcClient>,
173-
blockHash: `0x${string}`,
174-
): Promise<Date> => {
175-
try {
176-
const block = await eth_getBlockByHash(rpcRequest, { blockHash });
177-
return new Date(block.timestamp * 1000n);
178-
} catch (e) {
179-
return new Date();
167+
export const getBlockTimestamps = async (
168+
chain: Chain,
169+
blockHashes: Hash[],
170+
): Promise<Record<Hash, Date>> => {
171+
const rpcRequest = getRpcClient({
172+
client: thirdwebClient,
173+
chain,
174+
});
175+
176+
const now = new Date();
177+
const dedupe = Array.from(new Set(blockHashes));
178+
179+
const blocks = await Promise.all(
180+
dedupe.map(async (blockHash) => {
181+
try {
182+
const block = await eth_getBlockByHash(rpcRequest, { blockHash });
183+
return new Date(Number(block.timestamp * 1000n));
184+
} catch (e) {
185+
return now;
186+
}
187+
}),
188+
);
189+
190+
const res: Record<Hash, Date> = {};
191+
for (let i = 0; i < dedupe.length; i++) {
192+
res[dedupe[i]] = blocks[i];
180193
}
194+
return res;
181195
};
182196

183197
// Worker

src/worker/tasks/processTransactionReceiptsWorker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,14 @@ const getBlocksAndTransactions = async ({
6969
};
7070

7171
const handler: Processor<any, void, string> = async (job: Job<string>) => {
72-
const { chainId, contractAddresses, fromBlock, toBlock } =
72+
const { chainId, filters, fromBlock, toBlock } =
7373
superjson.parse<EnqueueProcessTransactionReceiptsData>(job.data);
7474

7575
const { blocks, transactionsWithReceipt } = await getBlocksAndTransactions({
7676
chainId,
7777
fromBlock,
7878
toBlock,
79-
contractAddresses,
79+
filters,
8080
});
8181

8282
const blockLookup = blocks.reduce((acc, curr) => {

0 commit comments

Comments
 (0)