Skip to content

Commit b964bd0

Browse files
committed
prune transaction lists and details
1 parent 6dc5008 commit b964bd0

File tree

8 files changed

+75
-50
lines changed

8 files changed

+75
-50
lines changed

src/db/transactions/db.ts

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import superjson from "superjson";
2-
import { env } from "../../utils/env";
32
import { redis } from "../../utils/redis/redis";
43
import { AnyTransaction } from "../../utils/transaction/types";
54

@@ -42,29 +41,15 @@ export class TransactionDB {
4241
/**
4342
* Inserts or replaces a transaction details.
4443
* Also adds to the appropriate "status" sorted set.
45-
* Sets a TTL for completed statuses (mined, cancelled, errored).
46-
*
4744
* @param transaction
4845
*/
4946
static set = async (transaction: AnyTransaction) => {
50-
const pipeline = redis.pipeline();
51-
52-
const shouldExpire = ["mined", "cancelled", "errored"].includes(
53-
transaction.status,
54-
);
55-
if (shouldExpire) {
56-
const ttlSeconds = env.PRUNE_TRANSACTIONS * 24 * 60 * 60;
57-
pipeline.setex(
58-
this.transactionDetailsKey(transaction.queueId),
59-
ttlSeconds,
60-
superjson.stringify(transaction),
61-
);
62-
} else {
63-
pipeline.set(
47+
const pipeline = redis
48+
.pipeline()
49+
.set(
6450
this.transactionDetailsKey(transaction.queueId),
6551
superjson.stringify(transaction),
6652
);
67-
}
6853

6954
switch (transaction.status) {
7055
case "queued":
@@ -136,6 +121,20 @@ export class TransactionDB {
136121
return result;
137122
};
138123

124+
/**
125+
* Deletes multiple transaction details by a list of queueIds.
126+
* @param queueIds
127+
* @returns number - The number of transaction details deleted.
128+
*/
129+
static bulkDelete = async (queueIds: string[]) => {
130+
if (queueIds.length === 0) {
131+
return [];
132+
}
133+
134+
const keys = queueIds.map(this.transactionDetailsKey);
135+
return await redis.unlink(...keys);
136+
};
137+
139138
/**
140139
* Check if a transaction exists.
141140
* @param queueId
@@ -151,7 +150,7 @@ export class TransactionDB {
151150
* @param limit
152151
* @returns
153152
*/
154-
static listByStatus = async (args: {
153+
static getTransactionListByStatus = async (args: {
155154
status: "queued" | "mined" | "cancelled" | "errored";
156155
page: number;
157156
limit: number;
@@ -180,21 +179,36 @@ export class TransactionDB {
180179
* Deletes transactions between a time range.
181180
* @param from Date?
182181
* @param to Date?
183-
* @returns string[] List of queueIds
184182
*/
185-
static pruneTransactions = async (args: { from?: Date; to?: Date }) => {
183+
static pruneTransactionLists = async (args: { from?: Date; to?: Date }) => {
186184
const { from, to } = args;
187185
const min = from ? toSeconds(from) : 0;
188186
const max = to ? toSeconds(to) : "+inf";
189187

190-
// Delete per-status sorted sets. Do not purge queued transactions.
188+
// Delete per-status sorted sets.
191189
await redis
192190
.pipeline()
191+
.zremrangebyscore(this.queuedTransactionsKey, min, max)
193192
.zremrangebyscore(this.minedTransactionsKey, min, max)
194193
.zremrangebyscore(this.cancelledTransactionsKey, min, max)
195194
.zremrangebyscore(this.erroredTransactionsKey, min, max)
196195
.exec();
197196
};
197+
198+
/**
199+
* Prunes transaction details after `keep` transactions.
200+
* Example: `keep=100` prunes all transaction details except the most recent 100.
201+
* @param keep number - The count of recent transactions to not prune. All older transactions are pruned.
202+
* @returns number - The number of transaction details pruned.
203+
*/
204+
static pruneTransactionDetails = async (keep: number) => {
205+
const queueIds = await redis.zrange(
206+
this.queuedTransactionsKey,
207+
0,
208+
-keep - 1,
209+
);
210+
return await this.bulkDelete(queueIds);
211+
};
198212
}
199213

200214
const toSeconds = (timestamp: Date) => timestamp.getTime() / 1000;

src/server/routes/backend-wallet/getTransactions.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export async function getAllTransactions(fastify: FastifyInstance) {
4343
const walletAddress = normalizeAddress(_walletAddress);
4444

4545
// @TODO: This query is not optimized. Cap the results to the most recent 10k total transactions for performance reasons.
46-
const { transactions } = await TransactionDB.listByStatus({
46+
const { transactions } = await TransactionDB.getTransactionListByStatus({
4747
status: "queued",
4848
page: 1,
4949
limit: 10_000,

src/server/routes/transaction/getAll.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,12 @@ export async function getAllTx(fastify: FastifyInstance) {
103103
handler: async (request, reply) => {
104104
const { page, limit } = request.query;
105105

106-
const { transactions, totalCount } = await TransactionDB.listByStatus({
107-
status: "queued",
108-
page,
109-
limit,
110-
});
106+
const { transactions, totalCount } =
107+
await TransactionDB.getTransactionListByStatus({
108+
status: "queued",
109+
page,
110+
limit,
111+
});
111112

112113
reply.status(StatusCodes.OK).send({
113114
result: {

src/server/routes/transaction/getAllDeployedContracts.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ export async function getAllDeployedContracts(fastify: FastifyInstance) {
8282
handler: async (request, reply) => {
8383
const { page, limit } = request.query;
8484

85-
const { transactions } = await TransactionDB.listByStatus({
85+
const { transactions } = await TransactionDB.getTransactionListByStatus({
8686
status: "queued",
8787
page,
8888
limit,

src/utils/env.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,20 @@ export const env = createEnv({
6161
ENABLE_HTTPS: boolSchema("false"),
6262
HTTPS_PASSPHRASE: z.string().default("thirdweb-engine"),
6363
TRUST_PROXY: z.boolean().default(false),
64+
// REDIS_MAXMEMORY should be set below the amount of memory Redis has available
65+
// to avoid running out of memory. If this limit is reached, keys are evicted
66+
// with a "allkeys-lru" policy (removes least recently used keys).
67+
REDIS_MAXMEMORY: z.string().default("900mb"),
6468
PRUNE_TRANSACTIONS: z
6569
.union([
6670
z.literal("true").transform(() => 7),
6771
z.literal("false").transform(() => 0),
6872
z.coerce.number().int(),
6973
])
7074
.default(7),
75+
// PRUNE_TRANSACTIONS_KEEP_COUNT defines the max transaction details to keep.
76+
// In testing, storing about 400k consumes 1GB memory.
77+
PRUNE_TRANSACTIONS_KEEP_COUNT: z.coerce.number().default(400_000),
7178
CLIENT_ANALYTICS_URL: z
7279
.union([UrlSchema, z.literal("")])
7380
.default("https://c.thirdweb.com/event"),
@@ -79,8 +86,8 @@ export const env = createEnv({
7986
.nonnegative()
8087
.default(0),
8188
REDIS_URL: z.string(),
82-
SEND_TRANSACTION_QUEUE_CONCURRENCY: z.coerce.number().default(1500),
83-
CONFIRM_TRANSACTION_QUEUE_CONCURRENCY: z.coerce.number().default(1500),
89+
SEND_TRANSACTION_QUEUE_CONCURRENCY: z.coerce.number().default(2000),
90+
CONFIRM_TRANSACTION_QUEUE_CONCURRENCY: z.coerce.number().default(2000),
8491
ENGINE_MODE: z
8592
.enum(["default", "sandbox", "server_only", "worker_only"])
8693
.default("default"),
@@ -103,6 +110,7 @@ export const env = createEnv({
103110
HTTPS_PASSPHRASE: process.env.HTTPS_PASSPHRASE,
104111
TRUST_PROXY: process.env.TRUST_PROXY,
105112
PRUNE_TRANSACTIONS: process.env.PRUNE_TRANSACTIONS,
113+
PRUNE_TRANSACTIONS_KEEP_COUNT: process.env.PRUNE_TRANSACTIONS_KEEP_COUNT,
106114
CLIENT_ANALYTICS_URL: process.env.CLIENT_ANALYTICS_URL,
107115
SDK_BATCH_TIME_LIMIT: process.env.SDK_BATCH_TIME_LIMIT,
108116
SDK_BATCH_SIZE_LIMIT: process.env.SDK_BATCH_SIZE_LIMIT,
@@ -116,6 +124,7 @@ export const env = createEnv({
116124
process.env.CONFIRM_TRANSACTION_QUEUE_CONCURRENCY,
117125
ENGINE_MODE: process.env.ENGINE_MODE,
118126
GLOBAL_RATE_LIMIT_PER_MIN: process.env.GLOBAL_RATE_LIMIT_PER_MIN,
127+
REDIS_MAXMEMORY: process.env.REDIS_MAXMEMORY,
119128
},
120129
onValidationError: (error: ZodError) => {
121130
console.error(

src/utils/redis/redis.ts

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,23 @@ export const redis = new Redis(env.REDIS_URL, {
66
enableAutoPipelining: true,
77
maxRetriesPerRequest: null,
88
});
9-
10-
redis.on("error", (err) => () => {
9+
try {
10+
await redis.config("SET", "maxmemory", env.REDIS_MAXMEMORY);
11+
} catch (error) {
1112
logger({
1213
level: "error",
13-
message: `Redis error: ${err}`,
14+
message: `Initializing Redis: ${error}`,
1415
service: "worker",
1516
});
16-
});
17-
redis.on("connect", () =>
18-
logger({
19-
level: "info",
20-
message: "Redis connected",
21-
service: "worker",
22-
}),
23-
);
24-
redis.on("reconnecting", () =>
17+
}
18+
19+
redis.on("error", (error) => () => {
2520
logger({
26-
level: "info",
27-
message: "Redis reconnecting",
21+
level: "error",
22+
message: `Redis error: ${error}`,
2823
service: "worker",
29-
}),
30-
);
24+
});
25+
});
3126
redis.on("ready", () => {
3227
logger({
3328
level: "info",

src/worker/queues/pruneTransactionsQueue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ _queue.setGlobalConcurrency(1);
1313
_queue.add("hourly-cron", "", {
1414
repeat: { pattern: "*/10 * * * *" },
1515
// Use a constant jobId to not insert multiple repeatable jobs.
16-
jobId: "billing-reporter-hourly-cron",
16+
jobId: "prune-transactions-cron",
1717
});

src/worker/tasks/pruneTransactionsWorker.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@ import { redis } from "../../utils/redis/redis";
55
import { PRUNE_TRANSACTIONS_QUEUE_NAME } from "../queues/pruneTransactionsQueue";
66
import { logWorkerExceptions } from "../queues/queues";
77

8-
const handler: Processor<any, void, string> = async (_: Job<string>) => {
8+
const handler: Processor<any, void, string> = async (job: Job<string>) => {
99
// Purge transactions up to `PRUNE_TRANSACTIONS` days ago.
1010
const to = new Date();
1111
to.setDate(to.getDate() - env.PRUNE_TRANSACTIONS);
12+
await TransactionDB.pruneTransactionLists({ to });
13+
job.log(`Pruned transaction lists to ${to.toLocaleString()}.`);
1214

13-
await TransactionDB.pruneTransactions({ to });
15+
// Prune transactions DB to the most recent `PRUNE_TRANSACTIONS_COUNT`.
16+
const numPruned = await TransactionDB.pruneTransactionDetails(
17+
env.PRUNE_TRANSACTIONS_KEEP_COUNT,
18+
);
19+
job.log(`Pruned ${numPruned} transaction details.`);
1420
};
1521

1622
// Worker

0 commit comments

Comments
 (0)