Skip to content

Commit 026aee1

Browse files
committed
graceful shutdown, use multi instead of lua script
1 parent f36d487 commit 026aee1

File tree

4 files changed

+53
-12
lines changed

4 files changed

+53
-12
lines changed

src/db/wallets/walletNonce.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ export const recycleNonce = async (
123123
});
124124
return;
125125
}
126+
126127
const key = recycledNoncesKey(chainId, walletAddress);
127128
await redis.sadd(key, nonce.toString());
128129
};

src/index.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@ import { env } from "./utils/env";
44
import { logger } from "./utils/logger";
55
import "./utils/tracer";
66
import { initWorker } from "./worker";
7+
import { CancelRecycledNoncesQueue } from "./worker/queues/cancelRecycledNoncesQueue";
8+
import { MigratePostgresTransactionsQueue } from "./worker/queues/migratePostgresTransactionsQueue";
9+
import { MineTransactionQueue } from "./worker/queues/mineTransactionQueue";
10+
import { NonceResyncQueue } from "./worker/queues/nonceResyncQueue";
11+
import { ProcessEventsLogQueue } from "./worker/queues/processEventLogsQueue";
12+
import { ProcessTransactionReceiptsQueue } from "./worker/queues/processTransactionReceiptsQueue";
13+
import { PruneTransactionsQueue } from "./worker/queues/pruneTransactionsQueue";
14+
import { SendTransactionQueue } from "./worker/queues/sendTransactionQueue";
15+
import { SendWebhookQueue } from "./worker/queues/sendWebhookQueue";
716

817
const main = async () => {
918
if (env.ENGINE_MODE === "server_only") {
@@ -40,3 +49,28 @@ process.on("unhandledRejection", (err) => {
4049
error: err,
4150
});
4251
});
52+
53+
process.on("SIGINT", () => gracefulShutdown("SIGINT"));
54+
process.on("SIGTERM", () => gracefulShutdown("SIGTERM"));
55+
56+
const gracefulShutdown = async (signal: NodeJS.Signals) => {
57+
logger({
58+
level: "info",
59+
service: "server",
60+
message: `Received ${signal}, closing server...`,
61+
});
62+
63+
// Gracefully close workers to minimize stalled jobs.
64+
// Source: https://docs.bullmq.io/guide/going-to-production#gracefully-shut-down-workers
65+
await SendWebhookQueue.q.close();
66+
await ProcessEventsLogQueue.q.close();
67+
await ProcessTransactionReceiptsQueue.q.close();
68+
await SendTransactionQueue.q.close();
69+
await MineTransactionQueue.q.close();
70+
await CancelRecycledNoncesQueue.q.close();
71+
await PruneTransactionsQueue.q.close();
72+
await MigratePostgresTransactionsQueue.q.close();
73+
await NonceResyncQueue.q.close();
74+
75+
process.exit(0);
76+
};

src/utils/env.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ export const env = createEnv({
6262
HTTPS_PASSPHRASE: z.string().default("thirdweb-engine"),
6363
TRUST_PROXY: z.boolean().default(false),
6464
// REDIS_MAXMEMORY should be set below the amount of memory Redis has available
65-
// to avoid running out of memory. If this limit is reached, keys are evicted
66-
// with a "allkeys-lru" policy (removes least recently used keys).
65+
// to avoid running out of memory. If this limit is reached, Engine cannot
66+
// write new keys because keys are not evicted.
6767
REDIS_MAXMEMORY: z.string().default("900mb"),
6868
// TRANSACTION_HISTORY_COUNT defines the max transaction details to keep.
69-
// In testing, storing about 400k consumes 1GB memory.
70-
TRANSACTION_HISTORY_COUNT: z.coerce.number().default(400_000),
69+
// In testing, storing about 300k consumes 900mb memory.
70+
TRANSACTION_HISTORY_COUNT: z.coerce.number().default(300_000),
7171
CLIENT_ANALYTICS_URL: z
7272
.union([UrlSchema, z.literal("")])
7373
.default("https://c.thirdweb.com/event"),

src/worker/tasks/cancelRecycledNoncesWorker.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,18 @@ const fromUnusedNoncesKey = (key: string) => {
7575

7676
const getAndDeleteUnusedNonces = async (key: string) => {
7777
// Returns all unused nonces for this key and deletes the key.
78-
const script = `
79-
local key = ARGV[1]
80-
local members = redis.call('SMEMBERS', key)
81-
redis.call('DEL', key)
82-
return members
83-
`;
84-
const results = (await redis.eval(script, 0, key)) as string[];
85-
return results.map(parseInt);
78+
// Example response:
79+
// [
80+
// [ null, [ '1', '2', '3', '4' ] ],
81+
// [ null, 1 ]
82+
// ]
83+
const multiResult = await redis.multi().smembers(key).del(key).exec();
84+
if (!multiResult) {
85+
throw new Error(`Error getting members of ${key}.`);
86+
}
87+
const [error, nonces] = multiResult[0];
88+
if (error) {
89+
throw new Error(`Error getting members of ${key}: ${error}`);
90+
}
91+
return (nonces as string[]).map((v) => parseInt(v));
8692
};

0 commit comments

Comments
 (0)