|
| 1 | +import { Static, Type } from "@sinclair/typebox"; |
| 2 | +import { Queue } from "bullmq"; |
| 3 | +import { FastifyInstance } from "fastify"; |
| 4 | +import { StatusCodes } from "http-status-codes"; |
| 5 | +import { stringify } from "thirdweb/utils"; |
| 6 | +import { TransactionDB } from "../../../db/transactions/db"; |
| 7 | +import { getConfig } from "../../../utils/cache/getConfig"; |
| 8 | +import { maybeDate } from "../../../utils/primitiveTypes"; |
| 9 | +import { redis } from "../../../utils/redis/redis"; |
| 10 | +import { MineTransactionQueue } from "../../../worker/queues/mineTransactionQueue"; |
| 11 | +import { SendTransactionQueue } from "../../../worker/queues/sendTransactionQueue"; |
| 12 | +import { createCustomError } from "../../middleware/error"; |
| 13 | +import { standardResponseSchema } from "../../schemas/sharedApiSchemas"; |
| 14 | + |
| 15 | +const requestSchema = Type.Object({ |
| 16 | + queueId: Type.String({ |
| 17 | + description: "Transaction queue ID", |
| 18 | + examples: ["9eb88b00-f04f-409b-9df7-7dcc9003bc35"], |
| 19 | + }), |
| 20 | +}); |
| 21 | + |
| 22 | +const jobSchema = Type.Object({ |
| 23 | + queue: Type.String(), |
| 24 | + jobId: Type.String(), |
| 25 | + timestamp: Type.String(), |
| 26 | + processedOn: Type.Optional(Type.String()), |
| 27 | + finishedOn: Type.Optional(Type.String()), |
| 28 | + lines: Type.Array(Type.String()), |
| 29 | +}); |
| 30 | + |
| 31 | +export const responseBodySchema = Type.Object({ |
| 32 | + result: Type.Object({ |
| 33 | + raw: Type.Any(), |
| 34 | + jobs: Type.Array(jobSchema), |
| 35 | + }), |
| 36 | +}); |
| 37 | + |
| 38 | +responseBodySchema.example = { |
| 39 | + result: { |
| 40 | + raw: { |
| 41 | + queueId: "9eb88b00-f04f-409b-9df7-7dcc9003bc35", |
| 42 | + }, |
| 43 | + logs: ["Log line 1", "Log line 2"], |
| 44 | + }, |
| 45 | +}; |
| 46 | + |
| 47 | +export async function getTransactionDetails(fastify: FastifyInstance) { |
| 48 | + fastify.route<{ |
| 49 | + Params: Static<typeof requestSchema>; |
| 50 | + Reply: Static<typeof responseBodySchema>; |
| 51 | + }>({ |
| 52 | + method: "GET", |
| 53 | + url: "/admin/transaction-details/:queueId", |
| 54 | + schema: { |
| 55 | + summary: "Get transaction details", |
| 56 | + description: "Get raw logs and details for a transaction by queueId.", |
| 57 | + tags: ["Admin"], |
| 58 | + operationId: "transactionDetails", |
| 59 | + params: requestSchema, |
| 60 | + response: { |
| 61 | + ...standardResponseSchema, |
| 62 | + [StatusCodes.OK]: responseBodySchema, |
| 63 | + }, |
| 64 | + hide: true, |
| 65 | + }, |
| 66 | + handler: async (request, reply) => { |
| 67 | + const { queueId } = request.params; |
| 68 | + |
| 69 | + const transaction = await TransactionDB.get(queueId); |
| 70 | + if (!transaction) { |
| 71 | + throw createCustomError( |
| 72 | + "Transaction not found.", |
| 73 | + StatusCodes.BAD_REQUEST, |
| 74 | + "TRANSACTION_NOT_FOUND", |
| 75 | + ); |
| 76 | + } |
| 77 | + |
| 78 | + const config = await getConfig(); |
| 79 | + const jobs: Static<typeof jobSchema>[] = []; |
| 80 | + |
| 81 | + // SentTransaction jobs. |
| 82 | + for ( |
| 83 | + let resendCount = 0; |
| 84 | + resendCount < config.maxRetriesPerTx; |
| 85 | + resendCount++ |
| 86 | + ) { |
| 87 | + const jobDetails = await getJobDetails({ |
| 88 | + queue: SendTransactionQueue.q, |
| 89 | + jobId: SendTransactionQueue.jobId({ queueId, resendCount }), |
| 90 | + }); |
| 91 | + if (jobDetails) { |
| 92 | + jobs.push(jobDetails); |
| 93 | + } |
| 94 | + } |
| 95 | + |
| 96 | + // MineTransaction job. |
| 97 | + const jobDetails = await getJobDetails({ |
| 98 | + queue: MineTransactionQueue.q, |
| 99 | + jobId: MineTransactionQueue.jobId({ queueId }), |
| 100 | + }); |
| 101 | + if (jobDetails) { |
| 102 | + jobs.push(jobDetails); |
| 103 | + } |
| 104 | + |
| 105 | + reply.status(StatusCodes.OK).send({ |
| 106 | + result: { |
| 107 | + raw: JSON.parse(stringify(transaction)), |
| 108 | + jobs, |
| 109 | + }, |
| 110 | + }); |
| 111 | + }, |
| 112 | + }); |
| 113 | +} |
| 114 | + |
| 115 | +const getJobDetails = async (args: { |
| 116 | + queue: Queue; |
| 117 | + jobId: string; |
| 118 | +}): Promise<Static<typeof jobSchema> | null> => { |
| 119 | + console.log("[DEBUG] args", args); |
| 120 | + const { queue, jobId } = args; |
| 121 | + const job = await queue.getJob(jobId); |
| 122 | + if (!job) { |
| 123 | + return null; |
| 124 | + } |
| 125 | + |
| 126 | + const key = `bull:${queue.name}:${jobId}:logs`; |
| 127 | + const lines = await redis.lrange(key, 0, -1); |
| 128 | + return { |
| 129 | + queue: queue.name, |
| 130 | + jobId, |
| 131 | + timestamp: maybeDate(job.timestamp).toISOString(), |
| 132 | + processedOn: maybeDate(job.processedOn)?.toISOString(), |
| 133 | + finishedOn: maybeDate(job.finishedOn)?.toISOString(), |
| 134 | + lines, |
| 135 | + }; |
| 136 | +}; |
0 commit comments