Skip to content

Commit ee44fbd

Browse files
authored
Websocket event updates fixed (#376)
* moved updateTxListener to server side to help with websockets realtime update * updated logs for websocket
1 parent 49fbdb7 commit ee44fbd

File tree

3 files changed

+18
-14
lines changed

3 files changed

+18
-14
lines changed

src/server/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { URL } from "url";
66
import { deleteAllWalletNonces } from "../db/wallets/deleteAllWalletNonces";
77
import { env } from "../utils/env";
88
import { logger } from "../utils/logger";
9+
import { updateTxListener } from "./listerners/updateTxListener";
910
import { withAuth } from "./middleware/auth";
1011
import { withCors } from "./middleware/cors";
1112
import { withErrorHandler } from "./middleware/error";
@@ -89,6 +90,7 @@ const main = async () => {
8990
});
9091

9192
writeOpenApiToFile(server);
93+
await updateTxListener();
9294
};
9395

9496
main();

src/worker/listeners/updateTxListener.ts renamed to src/server/listerners/updateTxListener.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { logger } from "../../utils/logger";
99

1010
export const updateTxListener = async (): Promise<void> => {
1111
logger({
12-
service: "worker",
12+
service: "server",
1313
level: "info",
1414
message: `Listening for updated transactions`,
1515
});
@@ -24,7 +24,7 @@ export const updateTxListener = async (): Promise<void> => {
2424

2525
// Send websocket message
2626
const index = subscriptionsData.findIndex(
27-
(sub) => sub.requestId === parsedPayload.identifier,
27+
(sub) => sub.requestId === parsedPayload.id,
2828
);
2929

3030
if (index == -1) {
@@ -33,7 +33,13 @@ export const updateTxListener = async (): Promise<void> => {
3333

3434
const userSubscription = subscriptionsData[index];
3535
const returnData = await getTxById({
36-
queueId: parsedPayload.identifier,
36+
queueId: parsedPayload.id,
37+
});
38+
39+
logger({
40+
service: "server",
41+
level: "info",
42+
message: `[updateTxListener] Sending websocket update for queueId: ${parsedPayload.id}, status ${returnData?.status}.`,
3743
});
3844

3945
const { message, closeConnection } =
@@ -47,26 +53,26 @@ export const updateTxListener = async (): Promise<void> => {
4753

4854
connection.on("end", async () => {
4955
logger({
50-
service: "worker",
56+
service: "server",
5157
level: "info",
52-
message: `Connection database ended`,
58+
message: `[updateTxListener] Connection database ended`,
5359
});
5460

5561
knex.client.releaseConnection(connection);
5662
await knex.destroy();
5763

5864
logger({
59-
service: "worker",
65+
service: "server",
6066
level: "info",
61-
message: `Released database connection on end`,
67+
message: `[updateTxListener] Released database connection on end`,
6268
});
6369
});
6470

6571
connection.on("error", async (err: any) => {
6672
logger({
67-
service: "worker",
73+
service: "server",
6874
level: "error",
69-
message: `Database connection error`,
75+
message: `[updateTxListener] Database connection error`,
7076
error: err,
7177
});
7278

@@ -76,7 +82,7 @@ export const updateTxListener = async (): Promise<void> => {
7682
logger({
7783
service: "worker",
7884
level: "info",
79-
message: `Released database connection on error`,
85+
message: `[updateTxListener] Released database connection on error`,
8086
});
8187
});
8288
};

src/worker/index.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import { deleteProcessedTx } from "./listeners/deleteProcessedTx";
88
import { minedTxListener } from "./listeners/minedTxListener";
99
import { queuedTxListener } from "./listeners/queuedTxListener";
1010
import { retryTxListener } from "./listeners/retryTxListener";
11-
import { updateTxListener } from "./listeners/updateTxListener";
1211
import {
1312
newWebhooksListener,
1413
updatedWebhooksListener,
@@ -18,9 +17,6 @@ const worker = async () => {
1817
// Listen for queued transactions to process
1918
await queuedTxListener();
2019

21-
// Listen for transaction updates to send webhooks
22-
await updateTxListener();
23-
2420
// Poll for transactions stuck in mempool to retry
2521
await retryTxListener();
2622

0 commit comments

Comments
 (0)