Skip to content

Commit 3e95d9c

Browse files
committed
wip
1 parent f50a76e commit 3e95d9c

File tree

11 files changed

+250
-29
lines changed

11 files changed

+250
-29
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
},
3232
"dependencies": {
3333
"@aws-sdk/client-kms": "^3.398.0",
34+
"@bull-board/fastify": "^5.21.1",
3435
"@fastify/cookie": "^8.3.0",
3536
"@fastify/express": "^2.3.0",
3637
"@fastify/swagger": "^8.9.0",
3738
"@fastify/type-provider-typebox": "^3.2.0",
3839
"@fastify/websocket": "^8.2.0",
3940
"@google-cloud/kms": "^4.4.0",
4041
"@prisma/client": "^5.16.1",
42+
"@queuedash/api": "^2.1.0",
4143
"@sinclair/typebox": "^0.31.28",
4244
"@t3-oss/env-core": "^0.6.0",
4345
"@thirdweb-dev/auth": "^4.1.87",

src/db/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export const knex = pg({
2222
acquireConnectionTimeout: 30000,
2323
} as Knex.Config);
2424

25-
export const isDatabaseHealthy = async (): Promise<boolean> => {
25+
export const isDatabaseReachable = async () => {
2626
try {
2727
await prisma.walletDetails.findFirst();
2828
return true;

src/server/index.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import { env } from "../utils/env";
88
import { logger } from "../utils/logger";
99
import { withServerUsageReporting } from "../utils/usage";
1010
import { updateTxListener } from "./listeners/updateTxListener";
11-
import { withAuth } from "./middleware/auth";
11+
import { withAdminRoutes } from "./middleware/adminRoutes";
1212
import { withCors } from "./middleware/cors";
13-
import { withEnforceEngineMode } from "./middleware/engine-mode";
13+
import { withEnforceEngineMode } from "./middleware/engineMode";
1414
import { withErrorHandler } from "./middleware/error";
1515
import { withExpress } from "./middleware/express";
1616
import { withRequestLogs } from "./middleware/logs";
@@ -65,11 +65,13 @@ export const initServer = async () => {
6565
await withEnforceEngineMode(server);
6666
await withRateLimit(server);
6767
await withWebSocket(server);
68-
await withAuth(server);
68+
// DEBUG
69+
// await withAuth(server);
6970
await withExpress(server);
7071
await withOpenApi(server);
7172
await withRoutes(server);
7273
await withServerUsageReporting(server);
74+
await withAdminRoutes(server);
7375

7476
await server.ready();
7577

src/server/middleware/adminRoutes.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { createBullBoard } from "@bull-board/api";
2+
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
3+
import { FastifyAdapter } from "@bull-board/fastify";
4+
import { FastifyInstance } from "fastify";
5+
import { SendTransactionQueue } from "../../worker/queues/sendTransactionQueue";
6+
7+
export const withAdminRoutes = async (server: FastifyInstance) => {
8+
const serverAdapter = new FastifyAdapter();
9+
10+
createBullBoard({
11+
queues: [new BullMQAdapter(SendTransactionQueue.q)],
12+
serverAdapter,
13+
});
14+
15+
const bullboardPath = "/admin/queues";
16+
17+
serverAdapter.setBasePath(bullboardPath);
18+
await server.register(serverAdapter.registerPlugin(), {
19+
basePath: bullboardPath,
20+
prefix: bullboardPath,
21+
});
22+
};

src/server/routes/system/health.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { Static, Type } from "@sinclair/typebox";
22
import { FastifyInstance } from "fastify";
33
import { StatusCodes } from "http-status-codes";
4-
import { isDatabaseHealthy } from "../../../db/client";
4+
import { isDatabaseReachable } from "../../../db/client";
55
import { env } from "../../../utils/env";
6-
import { redis } from "../../../utils/redis/redis";
6+
import { isRedisReachable, redis } from "../../../utils/redis/redis";
7+
import { createCustomError } from "../../middleware/error";
78

89
type EngineFeature = "KEYPAIR_AUTH" | "CONTRACT_SUBSCRIPTIONS" | "IP_ALLOWLIST";
910

@@ -44,11 +45,20 @@ export async function healthCheck(fastify: FastifyInstance) {
4445
},
4546
},
4647
handler: async (req, res) => {
47-
const db = await isDatabaseHealthy();
48-
if (!db) {
49-
return res.status(StatusCodes.SERVICE_UNAVAILABLE).send({
50-
error: "The database is unreachable.",
51-
});
48+
if (!(await isDatabaseReachable())) {
49+
throw createCustomError(
50+
"The database is unreachable.",
51+
StatusCodes.SERVICE_UNAVAILABLE,
52+
"FAILED_HEALTHCHECK",
53+
);
54+
}
55+
56+
if (!(await isRedisReachable())) {
57+
throw createCustomError(
58+
"Redis is unreachable.",
59+
StatusCodes.SERVICE_UNAVAILABLE,
60+
"FAILED_HEALTHCHECK",
61+
);
5262
}
5363

5464
res.status(StatusCodes.OK).send({

src/utils/redis/redis.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,12 @@ redis.on("ready", () => {
3030
service: "worker",
3131
});
3232
});
33+
34+
export const isRedisReachable = async () => {
35+
try {
36+
await redis.ping();
37+
return true;
38+
} catch (error) {
39+
return false;
40+
}
41+
};

src/worker/queues/cancelRecycledNoncesQueue.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@ import { Queue } from "bullmq";
22
import { redis } from "../../utils/redis/redis";
33
import { defaultJobOptions } from "./queues";
44

5-
export const CANCEL_RECYCLED_NONCES_QUEUE_NAME = "cancel-recycled-nonces";
5+
const CANCEL_RECYCLED_NONCES_QUEUE_NAME = "cancel-recycled-nonces";
66

7-
const _queue = new Queue<string>(CANCEL_RECYCLED_NONCES_QUEUE_NAME, {
8-
connection: redis,
9-
defaultJobOptions,
10-
});
7+
export const cancelRecycledNoncesQueue = new Queue<string>(
8+
CANCEL_RECYCLED_NONCES_QUEUE_NAME,
9+
{
10+
connection: redis,
11+
defaultJobOptions,
12+
},
13+
);
1114

12-
_queue.setGlobalConcurrency(1);
13-
_queue.add("hourly-cron", "", {
15+
cancelRecycledNoncesQueue.setGlobalConcurrency(1);
16+
cancelRecycledNoncesQueue.add("hourly-cron", "", {
1417
repeat: { pattern: "* * * * *" },
1518
// Use a constant jobId to not insert multiple repeatable jobs.
1619
jobId: "cancel-recycled-nonces-cron",

src/worker/queues/sendTransactionQueue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export type SendTransactionData = {
1111
export class SendTransactionQueue {
1212
public static name = "transactions-1-send";
1313

14-
private static q = new Queue<string>(this.name, {
14+
static q = new Queue<string>(this.name, {
1515
connection: redis,
1616
defaultJobOptions: {
1717
...defaultJobOptions,

src/worker/tasks/cancelRecycledNoncesWorker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { isNonceAlreadyUsedError } from "../../utils/error";
55
import { logger } from "../../utils/logger";
66
import { redis } from "../../utils/redis/redis";
77
import { sendCancellationTransaction } from "../../utils/transaction/cancelTransaction";
8-
import { CANCEL_RECYCLED_NONCES_QUEUE_NAME } from "../queues/cancelRecycledNoncesQueue";
8+
import { cancelRecycledNoncesQueue } from "../queues/cancelRecycledNoncesQueue";
99
import { logWorkerExceptions } from "../queues/queues";
1010

1111
/**
@@ -72,7 +72,7 @@ const getAndDeleteUnusedNonces = async (key: string) => {
7272
};
7373

7474
// Worker
75-
const _worker = new Worker(CANCEL_RECYCLED_NONCES_QUEUE_NAME, handler, {
75+
const _worker = new Worker(cancelRecycledNoncesQueue.name, handler, {
7676
concurrency: 1,
7777
connection: redis,
7878
});

0 commit comments

Comments
 (0)