Skip to content

Commit 866f8bb

Browse files
authored
Execute processTx on a cron schedule (#349)
1 parent f0c92a9 commit 866f8bb

File tree

2 files changed

+14
-59
lines changed

2 files changed

+14
-59
lines changed
Lines changed: 8 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,22 @@
1-
import PQueue from "p-queue";
2-
import { knex } from "../../db/client";
1+
import cron from "node-cron";
2+
import { getConfiguration } from "../../db/configuration/getConfiguration";
33
import { logger } from "../../utils/logger";
44
import { processTx } from "../tasks/processTx";
55

6-
const queue = new PQueue({
7-
concurrency: 1,
8-
autoStart: true,
9-
});
10-
11-
queue.on("error", (err) => {
12-
logger({
13-
service: "worker",
14-
level: "error",
15-
message: `[Queue Error] Size: ${queue.size}, Pending: ${queue.pending}`,
16-
error: err,
17-
});
18-
19-
throw err;
20-
});
21-
226
export const queuedTxListener = async (): Promise<void> => {
237
logger({
248
service: "worker",
259
level: "info",
2610
message: `Listening for queued transactions`,
2711
});
2812

29-
// TODO: This doesn't even need to be a listener
30-
const connection = await knex.client.acquireConnection();
31-
connection.query(`LISTEN new_transaction_data`);
32-
33-
// Queue transaction processing immediately on startup
34-
queue.add(processTx);
35-
36-
// Whenever we receive a new transaction, process it
37-
connection.on(
38-
"notification",
39-
async (msg: { channel: string; payload: string }) => {
40-
queue.add(processTx);
41-
},
42-
);
43-
44-
connection.on("end", async () => {
45-
await knex.destroy();
46-
knex.client.releaseConnection(connection);
47-
48-
logger({
49-
service: "worker",
50-
level: "info",
51-
message: `Released database connection on end`,
52-
});
53-
});
54-
55-
connection.on("error", async (err: any) => {
56-
logger({
57-
service: "worker",
58-
level: "error",
59-
message: `Database connection error`,
60-
error: err,
61-
});
13+
const config = await getConfiguration();
6214

63-
await knex.destroy();
64-
knex.client.releaseConnection(connection);
15+
if (!config.minedTxListenerCronSchedule) {
16+
return;
17+
}
6518

66-
logger({
67-
service: "worker",
68-
level: "info",
69-
message: `Released database connection on error`,
70-
error: err,
71-
});
19+
cron.schedule(config.minedTxListenerCronSchedule, async () => {
20+
await processTx();
7221
});
7322
};

src/worker/tasks/processTx.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ export const processTx = async () => {
4949
// 1. Select a batch of transactions and lock the rows so no other workers pick them up
5050
const txs = await getQueuedTxs({ pgtx });
5151

52+
logger({
53+
service: "worker",
54+
level: "info",
55+
message: `Received ${txs.length} transactions to process`,
56+
});
57+
5258
const config = await getConfiguration();
5359
if (txs.length < config.minTxsToProcess) {
5460
return;

0 commit comments

Comments
 (0)