Skip to content

Commit ad50094

Browse files
committed
explicit worker init step
1 parent c15ea6f commit ad50094

19 files changed

+358
-319
lines changed

src/server/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { logger } from "../utils/logger";
99
import { withServerUsageReporting } from "../utils/usage";
1010
import { updateTxListener } from "./listeners/updateTxListener";
1111
import { withAdminRoutes } from "./middleware/adminRoutes";
12+
import { withAuth } from "./middleware/auth";
1213
import { withCors } from "./middleware/cors";
1314
import { withEnforceEngineMode } from "./middleware/engineMode";
1415
import { withErrorHandler } from "./middleware/error";
@@ -65,8 +66,7 @@ export const initServer = async () => {
6566
await withEnforceEngineMode(server);
6667
await withRateLimit(server);
6768
await withWebSocket(server);
68-
// DEBUG
69-
// await withAuth(server);
69+
await withAuth(server);
7070
await withExpress(server);
7171
await withOpenApi(server);
7272
await withRoutes(server);

src/server/middleware/adminRoutes.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,31 @@
11
import { createBullBoard } from "@bull-board/api";
22
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
33
import { FastifyAdapter } from "@bull-board/fastify";
4+
import { Queue } from "bullmq";
45
import { FastifyInstance } from "fastify";
6+
import { CancelRecycledNoncesQueue } from "../../worker/queues/cancelRecycledNoncesQueue";
7+
import { MineTransactionQueue } from "../../worker/queues/mineTransactionQueue";
8+
import { ProcessEventsLogQueue } from "../../worker/queues/processEventLogsQueue";
9+
import { ProcessTransactionReceiptsQueue } from "../../worker/queues/processTransactionReceiptsQueue";
10+
import { PruneTransactionsQueue } from "../../worker/queues/pruneTransactionsQueue";
511
import { SendTransactionQueue } from "../../worker/queues/sendTransactionQueue";
12+
import { SendWebhookQueue } from "../../worker/queues/sendWebhookQueue";
613

714
export const withAdminRoutes = async (server: FastifyInstance) => {
815
const serverAdapter = new FastifyAdapter();
916

17+
const queues: Queue[] = [
18+
SendWebhookQueue.q,
19+
ProcessEventsLogQueue.q,
20+
ProcessTransactionReceiptsQueue.q,
21+
SendTransactionQueue.q,
22+
MineTransactionQueue.q,
23+
CancelRecycledNoncesQueue.q,
24+
PruneTransactionsQueue.q,
25+
];
26+
1027
createBullBoard({
11-
queues: [new BullMQAdapter(SendTransactionQueue.q)],
28+
queues: queues.map((q) => new BullMQAdapter(q)),
1229
serverAdapter,
1330
});
1431

src/utils/transaction/webhook.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { WebhooksEventTypes } from "../../schema/webhooks";
2-
import { enqueueWebhook } from "../../worker/queues/sendWebhookQueue";
2+
import { SendWebhookQueue } from "../../worker/queues/sendWebhookQueue";
33
import { AnyTransaction } from "./types";
44

55
export const enqueueTransactionWebhook = async (
@@ -16,9 +16,7 @@ export const enqueueTransactionWebhook = async (
1616
: status === "errored"
1717
? WebhooksEventTypes.ERRORED_TX
1818
: null;
19-
20-
if (!type) {
21-
return;
19+
if (type) {
20+
await SendWebhookQueue.enqueueWebhook({ type, queueId });
2221
}
23-
await enqueueWebhook({ type, queueId });
2422
};

src/worker/index.ts

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,23 @@ import {
77
newWebhooksListener,
88
updatedWebhooksListener,
99
} from "./listeners/webhookListener";
10-
11-
// Init workers.
12-
import "./tasks/cancelRecycledNoncesWorker";
13-
import "./tasks/mineTransactionWorker";
14-
import "./tasks/processEventLogsWorker";
15-
import "./tasks/processTransactionReceiptsWorker";
16-
import "./tasks/pruneTransactionsWorker";
17-
import "./tasks/sendTransactionWorker";
18-
import "./tasks/sendWebhookWorker";
10+
import { initCancelRecycledNoncesWorker } from "./tasks/cancelRecycledNoncesWorker";
11+
import { initMineTransactionWorker } from "./tasks/mineTransactionWorker";
12+
import { initProcessEventLogsWorker } from "./tasks/processEventLogsWorker";
13+
import { initProcessTransactionReceiptsWorker } from "./tasks/processTransactionReceiptsWorker";
14+
import { initPruneTransactionsWorker } from "./tasks/pruneTransactionsWorker";
15+
import { initSendTransactionWorker } from "./tasks/sendTransactionWorker";
16+
import { initSendWebhookWorker } from "./tasks/sendWebhookWorker";
1917

2018
export const initWorker = async () => {
19+
initCancelRecycledNoncesWorker();
20+
initProcessEventLogsWorker();
21+
initProcessTransactionReceiptsWorker();
22+
initPruneTransactionsWorker();
23+
initSendTransactionWorker();
24+
initMineTransactionWorker();
25+
initSendWebhookWorker();
26+
2127
// Listen for new & updated configuration data.
2228
await newConfigurationListener();
2329
await updatedConfigurationListener();
Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,25 @@
11
import { Queue } from "bullmq";
2+
import { env } from "../../utils/env";
23
import { redis } from "../../utils/redis/redis";
34
import { defaultJobOptions } from "./queues";
45

5-
const CANCEL_RECYCLED_NONCES_QUEUE_NAME = "cancel-recycled-nonces";
6+
export class CancelRecycledNoncesQueue {
7+
private static name = "cancel-recycled-nonces";
68

7-
export const cancelRecycledNoncesQueue = new Queue<string>(
8-
CANCEL_RECYCLED_NONCES_QUEUE_NAME,
9-
{
9+
static q = new Queue<string>(this.name, {
1010
connection: redis,
1111
defaultJobOptions,
12-
},
13-
);
12+
});
1413

15-
cancelRecycledNoncesQueue.setGlobalConcurrency(1);
16-
cancelRecycledNoncesQueue.add("hourly-cron", "", {
17-
repeat: { pattern: "* * * * *" },
18-
// Use a constant jobId to not insert multiple repeatable jobs.
19-
jobId: "cancel-recycled-nonces-cron",
20-
});
14+
constructor() {
15+
if (env.ENGINE_MODE === "server_only") {
16+
return;
17+
}
18+
19+
CancelRecycledNoncesQueue.q.setGlobalConcurrency(1);
20+
CancelRecycledNoncesQueue.q.add("cron", "", {
21+
repeat: { pattern: "* * * * *" },
22+
jobId: "cron",
23+
});
24+
}
25+
}

src/worker/queues/mineTransactionQueue.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ export type MineTransactionData = {
88
};
99

1010
export class MineTransactionQueue {
11-
public static name = "transactions-2-mine";
11+
private static name = "transactions-2-mine";
1212

13-
private static q = new Queue<string>(this.name, {
13+
static q = new Queue<string>(this.name, {
1414
connection: redis,
1515
defaultJobOptions: {
1616
...defaultJobOptions,

src/worker/queues/processEventLogsQueue.ts

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@ import { getConfig } from "../../utils/cache/getConfig";
55
import { redis } from "../../utils/redis/redis";
66
import { defaultJobOptions } from "./queues";
77

8-
export const PROCESS_EVENT_LOGS_QUEUE_NAME = "process-event-logs";
9-
10-
// Queue
11-
const _queue = new Queue<string>(PROCESS_EVENT_LOGS_QUEUE_NAME, {
12-
connection: redis,
13-
defaultJobOptions,
14-
});
15-
168
// Each job handles a block range for a given chain, filtered by addresses + events.
179
export type EnqueueProcessEventLogsData = {
1810
chainId: number;
@@ -24,28 +16,35 @@ export type EnqueueProcessEventLogsData = {
2416
toBlock: number; // inclusive
2517
};
2618

27-
export const enqueueProcessEventLogs = async (
28-
data: EnqueueProcessEventLogsData,
29-
) => {
30-
const serialized = SuperJSON.stringify(data);
31-
// e.g. 8453:14423125-14423685
32-
const jobName = `${data.chainId}:${data.fromBlock}-${data.toBlock}`;
33-
const { contractSubscriptionsRequeryDelaySeconds } = await getConfig();
34-
const requeryDelays = contractSubscriptionsRequeryDelaySeconds.split(",");
19+
export class ProcessEventsLogQueue {
20+
private static name = "process-event-logs";
3521

36-
// Enqueue one job immediately and any delayed jobs.
37-
await _queue.add(jobName, serialized);
22+
static q = new Queue<string>(this.name, {
23+
connection: redis,
24+
defaultJobOptions,
25+
});
3826

39-
// The last attempt should attempt repeatedly to handle extended RPC issues.
40-
// This backoff attempts at intervals:
41-
// 30s, 1m, 2m, 4m, 8m, 16m, 32m, ~1h, ~2h, ~4h
42-
for (let i = 0; i < requeryDelays.length; i++) {
43-
const delay = parseInt(requeryDelays[i]) * 1000;
44-
const attempts = i === requeryDelays.length - 1 ? 10 : 0;
45-
await _queue.add(jobName, serialized, {
46-
delay,
47-
attempts,
48-
backoff: { type: "exponential", delay: 30_000 },
49-
});
50-
}
51-
};
27+
static add = async (data: EnqueueProcessEventLogsData) => {
28+
const serialized = SuperJSON.stringify(data);
29+
// e.g. 8453:14423125-14423685
30+
const jobName = `${data.chainId}:${data.fromBlock}-${data.toBlock}`;
31+
const { contractSubscriptionsRequeryDelaySeconds } = await getConfig();
32+
const requeryDelays = contractSubscriptionsRequeryDelaySeconds.split(",");
33+
34+
// Enqueue one job immediately and any delayed jobs.
35+
await this.q.add(jobName, serialized);
36+
37+
// The last attempt should attempt repeatedly to handle extended RPC issues.
38+
// This backoff attempts at intervals:
39+
// 30s, 1m, 2m, 4m, 8m, 16m, 32m, ~1h, ~2h, ~4h
40+
for (let i = 0; i < requeryDelays.length; i++) {
41+
const delay = parseInt(requeryDelays[i]) * 1000;
42+
const attempts = i === requeryDelays.length - 1 ? 10 : 0;
43+
await this.q.add(jobName, serialized, {
44+
delay,
45+
attempts,
46+
backoff: { type: "exponential", delay: 30_000 },
47+
});
48+
}
49+
};
50+
}

src/worker/queues/processTransactionReceiptsQueue.ts

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,6 @@ import { getConfig } from "../../utils/cache/getConfig";
55
import { redis } from "../../utils/redis/redis";
66
import { defaultJobOptions } from "./queues";
77

8-
export const PROCESS_TRANSACTION_RECEIPTS_QUEUE_NAME =
9-
"process-transaction-receipts";
10-
11-
// Queue
12-
const _queue = redis
13-
? new Queue<string>(PROCESS_TRANSACTION_RECEIPTS_QUEUE_NAME, {
14-
connection: redis,
15-
defaultJobOptions,
16-
})
17-
: null;
18-
198
// Each job handles a block range for a given chain, filtered by addresses + events.
209
export type EnqueueProcessTransactionReceiptsData = {
2110
chainId: number;
@@ -27,30 +16,35 @@ export type EnqueueProcessTransactionReceiptsData = {
2716
toBlock: number; // inclusive
2817
};
2918

30-
export const enqueueProcessTransactionReceipts = async (
31-
data: EnqueueProcessTransactionReceiptsData,
32-
) => {
33-
if (!_queue) return;
19+
export class ProcessTransactionReceiptsQueue {
20+
private static name = "process-transaction-receipts";
3421

35-
const serialized = superjson.stringify(data);
36-
// e.g. 8453:14423125-14423685
37-
const jobName = `${data.chainId}:${data.fromBlock}-${data.toBlock}`;
38-
const { contractSubscriptionsRequeryDelaySeconds } = await getConfig();
39-
const requeryDelays = contractSubscriptionsRequeryDelaySeconds.split(",");
22+
static q = new Queue<string>(this.name, {
23+
connection: redis,
24+
defaultJobOptions,
25+
});
4026

41-
// Enqueue one job immediately and any delayed jobs.
42-
await _queue.add(jobName, serialized);
27+
static add = async (data: EnqueueProcessTransactionReceiptsData) => {
28+
const serialized = superjson.stringify(data);
29+
// e.g. 8453:14423125-14423685
30+
const jobName = `${data.chainId}:${data.fromBlock}-${data.toBlock}`;
31+
const { contractSubscriptionsRequeryDelaySeconds } = await getConfig();
32+
const requeryDelays = contractSubscriptionsRequeryDelaySeconds.split(",");
4333

44-
// The last attempt should attempt repeatedly to handle extended RPC issues.
45-
// This backoff attempts at intervals:
46-
// 30s, 1m, 2m, 4m, 8m, 16m, 32m, ~1h, ~2h, ~4h
47-
for (let i = 0; i < requeryDelays.length; i++) {
48-
const delay = parseInt(requeryDelays[i]) * 1000;
49-
const attempts = i === requeryDelays.length - 1 ? 10 : 0;
50-
await _queue.add(jobName, serialized, {
51-
delay,
52-
attempts,
53-
backoff: { type: "exponential", delay: 30_000 },
54-
});
55-
}
56-
};
34+
// Enqueue one job immediately and any delayed jobs.
35+
await this.q.add(jobName, serialized);
36+
37+
// The last attempt should attempt repeatedly to handle extended RPC issues.
38+
// This backoff attempts at intervals:
39+
// 30s, 1m, 2m, 4m, 8m, 16m, 32m, ~1h, ~2h, ~4h
40+
for (let i = 0; i < requeryDelays.length; i++) {
41+
const delay = parseInt(requeryDelays[i]) * 1000;
42+
const attempts = i === requeryDelays.length - 1 ? 10 : 0;
43+
await this.q.add(jobName, serialized, {
44+
delay,
45+
attempts,
46+
backoff: { type: "exponential", delay: 30_000 },
47+
});
48+
}
49+
};
50+
}
Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
import { Queue } from "bullmq";
2+
import { env } from "../../utils/env";
23
import { redis } from "../../utils/redis/redis";
34
import { defaultJobOptions } from "./queues";
45

5-
export const PRUNE_TRANSACTIONS_QUEUE_NAME = "prune-transactions";
6+
export class PruneTransactionsQueue {
7+
private static name = "prune-transactions";
68

7-
const _queue = new Queue<string>(PRUNE_TRANSACTIONS_QUEUE_NAME, {
8-
connection: redis,
9-
defaultJobOptions,
10-
});
9+
static q = new Queue<string>(this.name, {
10+
connection: redis,
11+
defaultJobOptions,
12+
});
1113

12-
_queue.setGlobalConcurrency(1);
13-
_queue.add("hourly-cron", "", {
14-
repeat: { pattern: "*/10 * * * *" },
15-
// Use a constant jobId to not insert multiple repeatable jobs.
16-
jobId: "prune-transactions-cron",
17-
});
14+
constructor() {
15+
if (env.ENGINE_MODE === "server_only") {
16+
return;
17+
}
18+
19+
PruneTransactionsQueue.q.setGlobalConcurrency(1);
20+
PruneTransactionsQueue.q.add("cron", "", {
21+
repeat: { pattern: "*/10 * * * *" },
22+
jobId: "cron",
23+
});
24+
}
25+
}

src/worker/queues/sendTransactionQueue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export type SendTransactionData = {
99
};
1010

1111
export class SendTransactionQueue {
12-
public static name = "transactions-1-send";
12+
private static name = "transactions-1-send";
1313

1414
static q = new Queue<string>(this.name, {
1515
connection: redis,

0 commit comments

Comments
 (0)