Skip to content

Commit 67e0908

Browse files
authored
Handling Edge Cases for RPC inconsistencies (#528)
* Added checks to make sure RPC is good * updated throw to log & return * added comments delayed requerying * udptes * updated user facing intefaces for contractSubscriptionsRetryDelaySeconds * updates * updates to solve build issue
1 parent 1dbbfb1 commit 67e0908

File tree

10 files changed

+137
-101
lines changed

10 files changed

+137
-101
lines changed

src/db/configuration/getConfiguration.ts

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Configuration } from "@prisma/client";
22
import { LocalWallet } from "@thirdweb-dev/wallets";
33
import { ethers } from "ethers";
4+
import { Config } from "../../schema/config";
45
import { WalletType } from "../../schema/wallet";
56
import { mandatoryAllowedCorsUrls } from "../../server/utils/cors-urls";
67
import { decrypt } from "../../utils/crypto";
@@ -9,38 +10,6 @@ import { logger } from "../../utils/logger";
910
import { prisma } from "../client";
1011
import { updateConfiguration } from "./updateConfiguration";
1112

12-
interface Config
13-
extends Omit<
14-
Configuration,
15-
| "awsAccessKeyId"
16-
| "awsSecretAccessKey"
17-
| "awsRegion"
18-
| "gcpApplicationProjectId"
19-
| "gcpKmsLocationId"
20-
| "gcpKmsKeyRingId"
21-
| "gcpApplicationCredentialEmail"
22-
| "gcpApplicationCredentialPrivateKey"
23-
> {
24-
walletConfiguration:
25-
| {
26-
type: WalletType.local;
27-
}
28-
| {
29-
type: WalletType.awsKms;
30-
awsAccessKeyId: string;
31-
awsSecretAccessKey: string;
32-
awsRegion: string;
33-
}
34-
| {
35-
type: WalletType.gcpKms;
36-
gcpApplicationProjectId: string;
37-
gcpKmsLocationId: string;
38-
gcpKmsKeyRingId: string;
39-
gcpApplicationCredentialEmail: string;
40-
gcpApplicationCredentialPrivateKey: string;
41-
};
42-
}
43-
4413
const withWalletConfig = async (config: Configuration): Promise<Config> => {
4514
// We destructure the config to omit wallet related fields to prevent direct access
4615
const {
@@ -52,6 +21,7 @@ const withWalletConfig = async (config: Configuration): Promise<Config> => {
5221
gcpKmsKeyRingId,
5322
gcpApplicationCredentialEmail,
5423
gcpApplicationCredentialPrivateKey,
24+
contractSubscriptionsRetryDelaySeconds,
5525
...restConfig
5626
} = config;
5727

@@ -84,8 +54,13 @@ const withWalletConfig = async (config: Configuration): Promise<Config> => {
8454
}
8555
}
8656

57+
// Renaming contractSubscriptionsRetryDelaySeconds
58+
// to contractSubscriptionsRequeryDelaySeconds to reflect its purpose
59+
// as we are requerying (& not retrying) with different delays
8760
return {
8861
...restConfig,
62+
contractSubscriptionsRequeryDelaySeconds:
63+
contractSubscriptionsRetryDelaySeconds,
8964
walletConfiguration: {
9065
type: WalletType.awsKms,
9166
awsRegion,
@@ -132,6 +107,8 @@ const withWalletConfig = async (config: Configuration): Promise<Config> => {
132107

133108
return {
134109
...restConfig,
110+
contractSubscriptionsRequeryDelaySeconds:
111+
contractSubscriptionsRetryDelaySeconds,
135112
walletConfiguration: {
136113
type: WalletType.gcpKms,
137114
gcpApplicationProjectId,
@@ -145,6 +122,8 @@ const withWalletConfig = async (config: Configuration): Promise<Config> => {
145122

146123
return {
147124
...restConfig,
125+
contractSubscriptionsRequeryDelaySeconds:
126+
contractSubscriptionsRetryDelaySeconds,
148127
walletConfiguration: {
149128
type: WalletType.local,
150129
},

src/schema/config.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { Configuration } from "@prisma/client";
2+
import { WalletType } from "./wallet";
3+
4+
export interface Config
5+
extends Omit<
6+
Configuration,
7+
| "awsAccessKeyId"
8+
| "awsSecretAccessKey"
9+
| "awsRegion"
10+
| "gcpApplicationProjectId"
11+
| "gcpKmsLocationId"
12+
| "gcpKmsKeyRingId"
13+
| "gcpApplicationCredentialEmail"
14+
| "gcpApplicationCredentialPrivateKey"
15+
| "contractSubscriptionsRetryDelaySeconds"
16+
> {
17+
walletConfiguration:
18+
| {
19+
type: WalletType.local;
20+
}
21+
| {
22+
type: WalletType.awsKms;
23+
awsAccessKeyId: string;
24+
awsSecretAccessKey: string;
25+
awsRegion: string;
26+
}
27+
| {
28+
type: WalletType.gcpKms;
29+
gcpApplicationProjectId: string;
30+
gcpKmsLocationId: string;
31+
gcpKmsKeyRingId: string;
32+
gcpApplicationCredentialEmail: string;
33+
gcpApplicationCredentialPrivateKey: string;
34+
};
35+
contractSubscriptionsRequeryDelaySeconds: string;
36+
}

src/server/routes/configuration/contract-subscriptions/get.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,20 @@ import { Static, Type } from "@sinclair/typebox";
22
import { FastifyInstance } from "fastify";
33
import { StatusCodes } from "http-status-codes";
44
import { getConfig } from "../../../../utils/cache/getConfig";
5-
import { standardResponseSchema } from "../../../schemas/sharedApiSchemas";
5+
import {
6+
contractSubscriptionConfigurationSchema,
7+
standardResponseSchema,
8+
} from "../../../schemas/sharedApiSchemas";
69

7-
export const ReplySchema = Type.Object({
8-
result: Type.Object({
9-
maxBlocksToIndex: Type.Number(),
10-
contractSubscriptionsRetryDelaySeconds: Type.String(),
11-
}),
10+
const responseSchema = Type.Object({
11+
result: contractSubscriptionConfigurationSchema,
1212
});
1313

1414
export async function getContractSubscriptionsConfiguration(
1515
fastify: FastifyInstance,
1616
) {
1717
fastify.route<{
18-
Reply: Static<typeof ReplySchema>;
18+
Reply: Static<typeof responseSchema>;
1919
}>({
2020
method: "GET",
2121
url: "/configuration/contract-subscriptions",
@@ -26,16 +26,16 @@ export async function getContractSubscriptionsConfiguration(
2626
operationId: "getContractSubscriptionsConfiguration",
2727
response: {
2828
...standardResponseSchema,
29-
[StatusCodes.OK]: ReplySchema,
29+
[StatusCodes.OK]: responseSchema,
3030
},
3131
},
3232
handler: async (req, res) => {
3333
const config = await getConfig();
3434
res.status(200).send({
3535
result: {
3636
maxBlocksToIndex: config.maxBlocksToIndex,
37-
contractSubscriptionsRetryDelaySeconds:
38-
config.contractSubscriptionsRetryDelaySeconds,
37+
contractSubscriptionsRequeryDelaySeconds:
38+
config.contractSubscriptionsRequeryDelaySeconds,
3939
},
4040
});
4141
},

src/server/routes/configuration/contract-subscriptions/update.ts

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,25 @@ import { StatusCodes } from "http-status-codes";
44
import { updateConfiguration } from "../../../../db/configuration/updateConfiguration";
55
import { getConfig } from "../../../../utils/cache/getConfig";
66
import { createCustomError } from "../../../middleware/error";
7-
import { standardResponseSchema } from "../../../schemas/sharedApiSchemas";
8-
import { ReplySchema } from "./get";
7+
import {
8+
contractSubscriptionConfigurationSchema,
9+
standardResponseSchema,
10+
} from "../../../schemas/sharedApiSchemas";
911

10-
const BodySchema = Type.Object({
12+
const requestBodySchema = Type.Object({
1113
maxBlocksToIndex: Type.Optional(Type.Number({ minimum: 1, maximum: 25 })),
12-
contractSubscriptionsRetryDelaySeconds: Type.Optional(Type.String()),
14+
contractSubscriptionsRequeryDelaySeconds: Type.Optional(Type.String()),
15+
});
16+
17+
const responseSchema = Type.Object({
18+
result: contractSubscriptionConfigurationSchema,
1319
});
1420

1521
export async function updateContractSubscriptionsConfiguration(
1622
fastify: FastifyInstance,
1723
) {
1824
fastify.route<{
19-
Body: Static<typeof BodySchema>;
25+
Body: Static<typeof requestBodySchema>;
2026
}>({
2127
method: "POST",
2228
url: "/configuration/contract-subscriptions",
@@ -25,34 +31,34 @@ export async function updateContractSubscriptionsConfiguration(
2531
description: "Update the configuration for Contract Subscriptions",
2632
tags: ["Configuration"],
2733
operationId: "updateContractSubscriptionsConfiguration",
28-
body: BodySchema,
34+
body: requestBodySchema,
2935
response: {
3036
...standardResponseSchema,
31-
[StatusCodes.OK]: ReplySchema,
37+
[StatusCodes.OK]: responseSchema,
3238
},
3339
},
3440
handler: async (req, res) => {
35-
const { maxBlocksToIndex, contractSubscriptionsRetryDelaySeconds } =
41+
const { maxBlocksToIndex, contractSubscriptionsRequeryDelaySeconds } =
3642
req.body;
3743

38-
if (!maxBlocksToIndex && !contractSubscriptionsRetryDelaySeconds) {
44+
if (!maxBlocksToIndex && !contractSubscriptionsRequeryDelaySeconds) {
3945
throw createCustomError(
4046
"At least one parameter is required",
4147
StatusCodes.BAD_REQUEST,
4248
"BAD_REQUEST",
4349
);
4450
}
4551

46-
if (contractSubscriptionsRetryDelaySeconds) {
52+
if (contractSubscriptionsRequeryDelaySeconds) {
4753
try {
48-
contractSubscriptionsRetryDelaySeconds.split(",").forEach((d) => {
54+
contractSubscriptionsRequeryDelaySeconds.split(",").forEach((d) => {
4955
if (Number.isNaN(parseInt(d))) {
5056
throw "Invalid number";
5157
}
5258
});
5359
} catch {
5460
throw createCustomError(
55-
'At least one integer "contractSubscriptionsRetryDelaySeconds" is required',
61+
'At least one integer "contractSubscriptionsRequeryDelaySeconds" is required',
5662
StatusCodes.BAD_REQUEST,
5763
"BAD_REQUEST",
5864
);
@@ -61,15 +67,16 @@ export async function updateContractSubscriptionsConfiguration(
6167

6268
await updateConfiguration({
6369
maxBlocksToIndex,
64-
contractSubscriptionsRetryDelaySeconds,
70+
contractSubscriptionsRetryDelaySeconds:
71+
contractSubscriptionsRequeryDelaySeconds,
6572
});
6673
const config = await getConfig(false);
6774

6875
res.status(200).send({
6976
result: {
7077
maxBlocksToIndex: config.maxBlocksToIndex,
71-
contractSubscriptionsRetryDelaySeconds:
72-
config.contractSubscriptionsRetryDelaySeconds,
78+
contractSubscriptionsRequeryDelaySeconds:
79+
config.contractSubscriptionsRequeryDelaySeconds,
7380
},
7481
});
7582
},

src/server/schemas/sharedApiSchemas.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,3 +332,8 @@ export const walletDetailsSchema = Type.Object({
332332
Type.Null(),
333333
]),
334334
});
335+
336+
export const contractSubscriptionConfigurationSchema = Type.Object({
337+
maxBlocksToIndex: Type.Number(),
338+
contractSubscriptionsRequeryDelaySeconds: Type.String(),
339+
});

src/utils/cache/getConfig.ts

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,7 @@
1-
import { Configuration } from "@prisma/client";
21
import { getConfiguration } from "../../db/configuration/getConfiguration";
3-
import { WalletType } from "../../schema/wallet";
2+
import { Config } from "../../schema/config";
43

54
const cacheKey = "config";
6-
interface Config
7-
extends Omit<
8-
Configuration,
9-
| "awsAccessKeyId"
10-
| "awsSecretAccessKey"
11-
| "awsRegion"
12-
| "gcpApplicationProjectId"
13-
| "gcpKmsLocationId"
14-
| "gcpKmsKeyRingId"
15-
| "gcpApplicationCredentialEmail"
16-
| "gcpApplicationCredentialPrivateKey"
17-
> {
18-
walletConfiguration:
19-
| {
20-
type: WalletType.local;
21-
}
22-
| {
23-
type: WalletType.awsKms;
24-
awsAccessKeyId: string;
25-
awsSecretAccessKey: string;
26-
awsRegion: string;
27-
}
28-
| {
29-
type: WalletType.gcpKms;
30-
gcpApplicationProjectId: string;
31-
gcpKmsLocationId: string;
32-
gcpKmsKeyRingId: string;
33-
gcpApplicationCredentialEmail: string;
34-
gcpApplicationCredentialPrivateKey: string;
35-
};
36-
}
37-
385
export const configCache = new Map<string, Config>();
396

407
export const getConfig = async (retrieveFromCache = true): Promise<Config> => {
@@ -51,7 +18,6 @@ export const getConfig = async (retrieveFromCache = true): Promise<Config> => {
5118
}
5219

5320
const configData = await getConfiguration();
54-
5521
configCache.set(cacheKey, configData);
5622
return configData;
5723
};

src/worker/index.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { logger } from "../utils/logger";
2+
import { redis } from "../utils/redis/redis";
13
import { chainIndexerListener } from "./listeners/chainIndexerListener";
24
import {
35
newConfigurationListener,
@@ -36,5 +38,13 @@ export const initWorker = async () => {
3638
await newWebhooksListener();
3739
await updatedWebhooksListener();
3840

39-
await chainIndexerListener();
41+
if (redis) {
42+
await chainIndexerListener();
43+
} else {
44+
logger({
45+
service: "worker",
46+
level: "warn",
47+
message: `Chain Indexer Listener not started, Redis not available`,
48+
});
49+
}
4050
};

src/worker/queues/processEventLogsQueue.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,22 @@ export const enqueueProcessEventLogs = async (
3232
const serialized = SuperJSON.stringify(data);
3333
// e.g. 8453:14423125-14423685
3434
const jobName = `${data.chainId}:${data.fromBlock}-${data.toBlock}`;
35-
const { contractSubscriptionsRetryDelaySeconds } = await getConfig();
36-
const retryDelays = contractSubscriptionsRetryDelaySeconds.split(",");
35+
const { contractSubscriptionsRequeryDelaySeconds } = await getConfig();
36+
const requeryDelays = contractSubscriptionsRequeryDelaySeconds.split(",");
3737

3838
// Enqueue one job immediately and any delayed jobs.
3939
await _queue.add(jobName, serialized);
40-
for (const retryDelay of retryDelays) {
40+
41+
// The last attempt should attempt repeatedly to handle extended RPC issues.
42+
// This backoff attempts at intervals:
43+
// 30s, 1m, 2m, 4m, 8m, 16m, 32m, ~1h, ~2h, ~4h
44+
for (let i = 0; i < requeryDelays.length; i++) {
45+
const delay = parseInt(requeryDelays[i]) * 1000;
46+
const attempts = i === requeryDelays.length - 1 ? 10 : 0;
4147
await _queue.add(jobName, serialized, {
42-
delay: parseInt(retryDelay) * 1000,
48+
delay,
49+
attempts,
50+
backoff: { type: "exponential", delay: 30_000 },
4351
});
4452
}
4553
};

src/worker/queues/processTransactionReceiptsQueue.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,22 @@ export const enqueueProcessTransactionReceipts = async (
3333
const serialized = SuperJSON.stringify(data);
3434
// e.g. 8453:14423125-14423685
3535
const jobName = `${data.chainId}:${data.fromBlock}-${data.toBlock}`;
36-
const { contractSubscriptionsRetryDelaySeconds } = await getConfig();
37-
const retryDelays = contractSubscriptionsRetryDelaySeconds.split(",");
36+
const { contractSubscriptionsRequeryDelaySeconds } = await getConfig();
37+
const requeryDelays = contractSubscriptionsRequeryDelaySeconds.split(",");
3838

3939
// Enqueue one job immediately and any delayed jobs.
4040
await _queue.add(jobName, serialized);
41-
for (const retryDelay of retryDelays) {
41+
42+
// The last attempt should attempt repeatedly to handle extended RPC issues.
43+
// This backoff attempts at intervals:
44+
// 30s, 1m, 2m, 4m, 8m, 16m, 32m, ~1h, ~2h, ~4h
45+
for (let i = 0; i < requeryDelays.length; i++) {
46+
const delay = parseInt(requeryDelays[i]) * 1000;
47+
const attempts = i === requeryDelays.length - 1 ? 10 : 0;
4248
await _queue.add(jobName, serialized, {
43-
delay: parseInt(retryDelay) * 1000,
49+
delay,
50+
attempts,
51+
backoff: { type: "exponential", delay: 30_000 },
4452
});
4553
}
4654
};

0 commit comments

Comments
 (0)