Skip to content

Commit 224bd8b

Browse files
arcoravend4mr
andauthored
fix: Simplify Contract Subscriptions block range (#744)
* fix: Simplify Contract Subscriptions block range * Fix build * logs * undo try/catch --------- Co-authored-by: Prithvish Baidya <deformercoding@gmail.com>
1 parent ba59e74 commit 224bd8b

File tree

10 files changed

+195
-252
lines changed

10 files changed

+195
-252
lines changed

src/db/chainIndexers/getChainIndexer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { PrismaTransaction } from "../../schema/prisma";
1+
import type { PrismaTransaction } from "../../schema/prisma";
22
import { getPrismaWithPostgresTx } from "../client";
33

44
interface GetLastIndexedBlockParams {
@@ -45,5 +45,5 @@ export const getBlockForIndexing = async ({
4545
"chainId"=${chainId}
4646
FOR UPDATE NOWAIT
4747
`;
48-
return lastIndexedBlock[0]["lastIndexedBlock"];
48+
return lastIndexedBlock[0].lastIndexedBlock;
4949
};

src/utils/env.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,6 @@ export const env = createEnv({
6969
SDK_BATCH_TIME_LIMIT: z.coerce.number().default(0),
7070
SDK_BATCH_SIZE_LIMIT: z.coerce.number().default(100),
7171
ENABLE_KEYPAIR_AUTH: boolEnvSchema(false),
72-
CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS: z.coerce
73-
.number()
74-
.nonnegative()
75-
.default(0),
7672
REDIS_URL: z.string(),
7773
SEND_TRANSACTION_QUEUE_CONCURRENCY: z.coerce.number().default(200),
7874
CONFIRM_TRANSACTION_QUEUE_CONCURRENCY: z.coerce.number().default(200),
@@ -125,8 +121,6 @@ export const env = createEnv({
125121
SDK_BATCH_TIME_LIMIT: process.env.SDK_BATCH_TIME_LIMIT,
126122
SDK_BATCH_SIZE_LIMIT: process.env.SDK_BATCH_SIZE_LIMIT,
127123
ENABLE_KEYPAIR_AUTH: process.env.ENABLE_KEYPAIR_AUTH,
128-
CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS:
129-
process.env.CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS,
130124
REDIS_URL: process.env.REDIS_URL,
131125
SEND_TRANSACTION_QUEUE_CONCURRENCY:
132126
process.env.SEND_TRANSACTION_QUEUE_CONCURRENCY,

src/utils/indexer/getBlockTime.ts

Lines changed: 22 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,26 @@
1-
import { getSdk } from "../cache/getSdk";
2-
import { logger } from "../logger";
1+
import { eth_getBlockByNumber, getRpcClient } from "thirdweb";
2+
import { getChain } from "../chain";
3+
import { thirdwebClient } from "../sdk";
34

4-
const KNOWN_BLOCKTIME_SECONDS = {
5-
1: 12,
6-
137: 2,
7-
} as Record<number, number>;
5+
export const getBlockTimeSeconds = async (
6+
chainId: number,
7+
blocksToEstimate: number,
8+
) => {
9+
const chain = await getChain(chainId);
10+
const rpcRequest = getRpcClient({
11+
client: thirdwebClient,
12+
chain,
13+
});
814

9-
const DEFAULT_BLOCKTIME_SECONDS = 10;
10-
const BLOCKS_TO_ESTIMATE = 100;
15+
const latestBlock = await eth_getBlockByNumber(rpcRequest, {
16+
blockTag: "latest",
17+
includeTransactions: false,
18+
});
19+
const referenceBlock = await eth_getBlockByNumber(rpcRequest, {
20+
blockNumber: latestBlock.number - BigInt(blocksToEstimate),
21+
includeTransactions: false,
22+
});
1123

12-
export const getBlockTimeSeconds = async (chainId: number) => {
13-
if (KNOWN_BLOCKTIME_SECONDS[chainId]) {
14-
return KNOWN_BLOCKTIME_SECONDS[chainId];
15-
}
16-
17-
const sdk = await getSdk({ chainId });
18-
const provider = sdk.getProvider();
19-
try {
20-
const latestBlockNumber = await provider.getBlockNumber();
21-
const blockNumbers = Array.from(
22-
{ length: BLOCKS_TO_ESTIMATE },
23-
(_, i) => latestBlockNumber - i - 1,
24-
);
25-
26-
const blocks = await Promise.all(
27-
blockNumbers.map(async (blockNumber) => {
28-
const block = await provider.getBlock(blockNumber);
29-
return block;
30-
}),
31-
);
32-
33-
let totalTimeDiff = 0;
34-
for (let i = 0; i < blocks.length - 1; i++) {
35-
totalTimeDiff += blocks[i].timestamp - blocks[i + 1].timestamp;
36-
}
37-
38-
const averageBlockTime = totalTimeDiff / (blocks.length - 1);
39-
return averageBlockTime;
40-
} catch (error) {
41-
logger({
42-
service: "worker",
43-
level: "error",
44-
message: `Error estimating block time for chainId ${chainId}:`,
45-
error,
46-
});
47-
return DEFAULT_BLOCKTIME_SECONDS;
48-
}
24+
const diffSeconds = latestBlock.timestamp - referenceBlock.timestamp;
25+
return Number(diffSeconds) / (blocksToEstimate + 1);
4926
};

src/worker/indexers/chainIndexerRegistry.ts

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,57 @@
11
import cron from "node-cron";
2-
import { getConfig } from "../../utils/cache/getConfig";
3-
import { env } from "../../utils/env";
42
import { getBlockTimeSeconds } from "../../utils/indexer/getBlockTime";
53
import { logger } from "../../utils/logger";
6-
import { createChainIndexerTask } from "../tasks/chainIndexer";
4+
import { handleContractSubscriptions } from "../tasks/chainIndexer";
75

6+
// @TODO: Move all worker logic to Bullmq to better handle multiple hosts.
87
export const INDEXER_REGISTRY = {} as Record<number, cron.ScheduledTask>;
98

109
export const addChainIndexer = async (chainId: number) => {
1110
if (INDEXER_REGISTRY[chainId]) {
11+
return;
12+
}
13+
14+
// Estimate the block time in the last 100 blocks. Default to 2 second block times.
15+
let blockTimeSeconds: number;
16+
try {
17+
blockTimeSeconds = await getBlockTimeSeconds(chainId, 100);
18+
} catch (error) {
1219
logger({
1320
service: "worker",
14-
level: "warn",
15-
message: `Chain Indexer already exists: ${chainId}`,
21+
level: "error",
22+
message: `Could not estimate block time for chain ${chainId}`,
23+
error,
1624
});
17-
return;
25+
blockTimeSeconds = 2;
1826
}
19-
20-
let processStarted = false;
21-
const config = await getConfig();
22-
23-
// Estimate block time.
24-
const blockTimeSeconds = await getBlockTimeSeconds(chainId);
25-
26-
const blocksIn5Seconds = Math.round((1 / blockTimeSeconds) * 5);
27-
const maxBlocksToIndex = Math.max(
28-
config.maxBlocksToIndex,
29-
blocksIn5Seconds * 4,
30-
);
31-
32-
// Compute block offset based on delay.
33-
// Example: 10s delay with a 3s block time = 4 blocks offset
34-
const toBlockOffset = env.CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS
35-
? Math.ceil(env.CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS / blockTimeSeconds)
36-
: 0;
37-
38-
const handler = await createChainIndexerTask({
39-
chainId,
40-
maxBlocksToIndex,
41-
toBlockOffset,
42-
});
43-
4427
const cronSchedule = createScheduleSeconds(
4528
Math.max(Math.round(blockTimeSeconds), 1),
4629
);
47-
4830
logger({
4931
service: "worker",
5032
level: "info",
51-
message: `Indexing contracts on chainId: ${chainId} with schedule: ${cronSchedule}, max blocks to index: ${maxBlocksToIndex}`,
33+
message: `Indexing contracts on chain ${chainId} with schedule: ${cronSchedule}`,
5234
});
5335

36+
let inProgress = false;
37+
5438
const task = cron.schedule(cronSchedule, async () => {
55-
if (!processStarted) {
56-
processStarted = true;
39+
if (inProgress) {
40+
return;
41+
}
5742

58-
try {
59-
await handler();
60-
} catch (error) {
61-
// do nothing
62-
} finally {
63-
processStarted = false;
64-
}
43+
inProgress = true;
44+
try {
45+
await handleContractSubscriptions(chainId);
46+
} catch (error) {
47+
logger({
48+
service: "worker",
49+
level: "error",
50+
message: `Failed to index on chain ${chainId}`,
51+
error,
52+
});
53+
} finally {
54+
inProgress = false;
6555
}
6656
});
6757

@@ -70,13 +60,7 @@ export const addChainIndexer = async (chainId: number) => {
7060

7161
export const removeChainIndexer = async (chainId: number) => {
7262
const task = INDEXER_REGISTRY[chainId];
73-
7463
if (!task) {
75-
logger({
76-
service: "worker",
77-
level: "warn",
78-
message: `Chain Indexer doesn't exist: ${chainId}`,
79-
});
8064
return;
8165
}
8266

src/worker/listeners/chainIndexerListener.ts

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,13 @@
11
import cron from "node-cron";
22
import { getConfig } from "../../utils/cache/getConfig";
33
import { logger } from "../../utils/logger";
4-
import { redis } from "../../utils/redis/redis";
54
import { manageChainIndexers } from "../tasks/manageChainIndexers";
65

76
let processChainIndexerStarted = false;
87
let task: cron.ScheduledTask;
98

109
export const chainIndexerListener = async (): Promise<void> => {
11-
if (!redis) {
12-
logger({
13-
service: "worker",
14-
level: "warn",
15-
message: `Chain Indexer Listener not started, Redis not available`,
16-
});
17-
return;
18-
}
19-
20-
logger({
21-
service: "worker",
22-
level: "info",
23-
message: `Listening for indexed contracts`,
24-
});
25-
2610
const config = await getConfig();
27-
2811
if (!config.indexerListenerCronSchedule) {
2912
return;
3013
}
@@ -41,7 +24,7 @@ export const chainIndexerListener = async (): Promise<void> => {
4124
logger({
4225
service: "worker",
4326
level: "warn",
44-
message: `manageChainIndexers already running, skipping`,
27+
message: "manageChainIndexers already running, skipping",
4528
});
4629
}
4730
});

0 commit comments

Comments
 (0)