Skip to content

Commit cb00704

Browse files
authored
Move webhook update listener to worker (#262)
1 parent a606729 commit cb00704

File tree

4 files changed

+64
-69
lines changed

4 files changed

+64
-69
lines changed

server/controller/tx-update-listener.ts

Lines changed: 0 additions & 66 deletions
This file was deleted.

server/index.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { URL } from "url";
66
import { env } from "../src/utils/env";
77
import { logger } from "../src/utils/logger";
88
import { withRoutes } from "./api";
9-
import { startTxUpdatesNotificationListener } from "./controller/tx-update-listener";
109
import { withAuth } from "./middleware/auth";
1110
import { withCors } from "./middleware/cors";
1211
import { withErrorHandler } from "./middleware/error";
@@ -65,8 +64,6 @@ const main = async () => {
6564
}
6665
},
6766
);
68-
69-
await startTxUpdatesNotificationListener();
7067
};
7168

7269
main();

src/worker/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@ import { deleteProcessedTx } from "./listeners/deleteProcessedTx";
22
import { minedTxListener } from "./listeners/minedTxListener";
33
import { queuedTxListener } from "./listeners/queuedTxListener";
44
import { retryTxListener } from "./listeners/retryTxListener";
5+
import { updateTxListener } from "./listeners/updateTxListener";
56

67
const worker = async () => {
78
// Listen for queued transactions to process
89
await queuedTxListener();
910

11+
// Listen for transaction updates to send webhooks
12+
await updateTxListener();
13+
1014
// Poll for transactions stuck in mempool to retry
1115
await retryTxListener();
1216

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import {
2+
formatSocketMessage,
3+
getStatusMessageAndConnectionStatus,
4+
} from "../../../server/helpers/websocket";
5+
import { subscriptionsData } from "../../../server/schemas/websocket";
6+
import { sendTxWebhook } from "../../../server/utilities/webhook";
7+
import { knex } from "../../db/client";
8+
import { getTxById } from "../../db/transactions/getTxById";
9+
import { logger } from "../../utils/logger";
10+
11+
export const updateTxListener = async (): Promise<void> => {
12+
logger.worker.info(`Listening for updated transactions`);
13+
14+
const connection = await knex.client.acquireConnection();
15+
connection.query(`LISTEN updated_transaction_data`);
16+
17+
connection.on(
18+
"notification",
19+
async (msg: { channel: string; payload: string }) => {
20+
const parsedPayload = JSON.parse(msg.payload);
21+
22+
// Send webhook
23+
await sendTxWebhook(parsedPayload);
24+
25+
// Send websocket message
26+
const index = subscriptionsData.findIndex(
27+
(sub) => sub.requestId === parsedPayload.identifier,
28+
);
29+
30+
if (index == -1) {
31+
return;
32+
}
33+
34+
const userSubscription = subscriptionsData[index];
35+
const returnData = await getTxById({
36+
queueId: parsedPayload.identifier,
37+
});
38+
const { message, closeConnection } =
39+
await getStatusMessageAndConnectionStatus(returnData);
40+
userSubscription.socket.send(
41+
await formatSocketMessage(returnData, message),
42+
);
43+
closeConnection ? userSubscription.socket.close() : null;
44+
},
45+
);
46+
47+
connection.on("end", async () => {
48+
logger.server.info(`Connection database ended`);
49+
knex.client.releaseConnection(connection);
50+
await knex.destroy();
51+
logger.server.info(`Released sql connection : on end`);
52+
});
53+
54+
connection.on("error", async (err: any) => {
55+
logger.server.error(err);
56+
knex.client.releaseConnection(connection);
57+
await knex.destroy();
58+
logger.server.info(`Released sql connection: on error`);
59+
});
60+
};

0 commit comments

Comments
 (0)