Skip to content

Commit 009070e

Browse files
authored
Add transaction locking for nonce management (#132)
* Update to make transaction processing a prisma. * Update queries to support pgtx * Use transaction with timeout * Add locking on getWalletNonce * Update nodemon to watch src
1 parent 671a175 commit 009070e

22 files changed

+463
-262
lines changed

core/env.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export const env = createEnv({
6161
MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY: z.string().default("55000000000"),
6262
MAX_RETRIES_FOR_TX: z.coerce.number().default(3),
6363
RETRY_TX_CRON_SCHEDULE: z.string().default("*/30 * * * * *"),
64-
MAX_BLOCKS_ELAPSED_BEFORE_RETRY: z.coerce.number().default(50),
64+
MAX_BLOCKS_ELAPSED_BEFORE_RETRY: z.coerce.number().default(10),
6565
MAX_WAIT_TIME_BEFORE_RETRY: z.coerce.number().default(600),
6666
},
6767
clientPrefix: "NEVER_USED",

core/sdk/sdk.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
walletTableSchema,
1212
} from "../../server/schemas/wallet";
1313
import { getWalletDetails } from "../../src/db/wallets/getWalletDetails";
14+
import { PrismaTransaction } from "../../src/schema/prisma";
1415
import { env } from "../env";
1516
import { networkResponseSchema } from "../schema";
1617

@@ -95,7 +96,11 @@ const cacheSdk = (
9596
};
9697

9798
const walletDataMap: Map<string, string> = new Map();
98-
const getCachedWallet = async (walletAddress: string, chainId: number) => {
99+
const getCachedWallet = async (
100+
walletAddress: string,
101+
chainId: number,
102+
pgtx?: PrismaTransaction,
103+
) => {
99104
walletAddress = walletAddress.toLowerCase();
100105
let walletData;
101106
const cachedWallet = walletDataMap.get(walletAddress);
@@ -104,7 +109,11 @@ const getCachedWallet = async (walletAddress: string, chainId: number) => {
104109
} else {
105110
console.log("Checking details for address", walletAddress);
106111
// TODO: This needs to be changed...
107-
walletData = await getWalletDetails({ address: walletAddress, chainId });
112+
walletData = await getWalletDetails({
113+
pgtx,
114+
address: walletAddress,
115+
chainId,
116+
});
108117
console.log("Received wallet data:", walletData);
109118
if (walletData) {
110119
walletDataMap.set(walletAddress, JSON.stringify(walletData));
@@ -121,6 +130,7 @@ const THIRDWEB_API_SECRET_KEY = env.THIRDWEB_API_SECRET_KEY;
121130
export const getSDK = async (
122131
chainName: ChainOrRpc,
123132
walletAddress?: string,
133+
pgtx?: PrismaTransaction,
124134
): Promise<ThirdwebSDK> => {
125135
let walletData: Static<typeof walletTableSchema> | undefined;
126136

@@ -158,7 +168,7 @@ export const getSDK = async (
158168
return sdk;
159169
}
160170

161-
walletData = await getCachedWallet(walletAddress, chain.chainId);
171+
walletData = await getCachedWallet(walletAddress, chain.chainId, pgtx);
162172

163173
if (!walletData) {
164174
throw new Error(`Wallet not found for address: ${walletAddress}`);

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
"docker": "docker compose --env-file ./.env up --remove-orphans",
1111
"docker:build": "docker compose build --no-cache",
1212
"dev": "yarn dev:infra && yarn prisma:init && yarn dev:server & sleep 10 && yarn dev:worker",
13-
"dev:server": "yarn prisma:init && nodemon --watch 'server/**/*.ts' --watch 'core/**/*.ts' --exec 'npx tsx ./server/index.ts' --files server/index.ts",
14-
"dev:worker": "yarn prisma:init && nodemon --watch 'worker/**/*.ts' --watch 'core/**/*.ts' --exec 'npx tsx ./worker/index.ts' --files worker/index.ts",
13+
"dev:server": "yarn prisma:init && nodemon --watch 'server/**/*.ts' --watch 'core/**/*.ts' --watch 'src/**/*.ts' --exec 'npx tsx ./server/index.ts' --files server/index.ts",
14+
"dev:worker": "yarn prisma:init && nodemon --watch 'worker/**/*.ts' --watch 'core/**/*.ts' --watch 'src/**/*.ts' --exec 'npx tsx ./worker/index.ts' --files worker/index.ts",
1515
"dev:infra": "docker compose -f ./docker-compose-infra.yml up -d",
1616
"build": "yarn && rm -rf dist && tsc -p ./tsconfig.json --outDir dist",
1717
"prisma:reset": "prisma migrate reset --force && prisma generate",

src/db/client.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
import { PrismaClient } from "@prisma/client";
2+
import { PrismaTransaction } from "../schema/prisma";
23

34
export const prisma = new PrismaClient();
5+
6+
export const getPrismaWithPostgresTx = (pgtx?: PrismaTransaction) => {
7+
return pgtx || prisma;
8+
};

src/db/transactions/getAllTxs.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@ import {
44
transactionResponseSchema,
55
} from "../../../server/schemas/transaction";
66
import { ContractExtension } from "../../schema/extension";
7-
import { prisma } from "../client";
7+
import { PrismaTransaction } from "../../schema/prisma";
8+
import { getPrismaWithPostgresTx } from "../client";
89
import { cleanTxs } from "./cleanTxs";
910

1011
interface GetAllTxsParams {
12+
pgtx?: PrismaTransaction;
1113
page: number;
1214
limit: number;
1315
filter?: TransactionStatusEnum;
1416
extensions?: ContractExtension[];
1517
}
1618

1719
export const getAllTxs = async ({
20+
pgtx,
1821
page,
1922
limit,
2023
filter,
@@ -40,6 +43,8 @@ export const getAllTxs = async ({
4043
filterBy = "errorMessage";
4144
}
4245

46+
const prisma = getPrismaWithPostgresTx(pgtx);
47+
4348
// TODO: Cleaning should be handled by zod
4449
const txs = await prisma.transactions.findMany({
4550
where: {

src/db/transactions/getQueuedTxs.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
import { Static } from "@sinclair/typebox";
22
import { env } from "../../../core/env";
33
import { transactionResponseSchema } from "../../../server/schemas/transaction";
4-
import { prisma } from "../client";
4+
import { PrismaTransaction } from "../../schema/prisma";
5+
import { getPrismaWithPostgresTx } from "../client";
56
import { cleanTxs } from "./cleanTxs";
67

7-
export const getQueuedTxs = async (): Promise<
8+
interface GetQueuedTxsParams {
9+
pgtx?: PrismaTransaction;
10+
}
11+
12+
export const getQueuedTxs = async ({ pgtx }: GetQueuedTxsParams = {}): Promise<
813
Static<typeof transactionResponseSchema>[]
914
> => {
15+
const prisma = getPrismaWithPostgresTx(pgtx);
16+
1017
// TODO: Don't use env var for transactions to batch
1118
const txs = await prisma.$queryRaw`
1219
SELECT

src/db/transactions/getSentTxs.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
import { Static } from "@sinclair/typebox";
22
import { env } from "../../../core/env";
33
import { transactionResponseSchema } from "../../../server/schemas/transaction";
4-
import { prisma } from "../client";
4+
import { PrismaTransaction } from "../../schema/prisma";
5+
import { getPrismaWithPostgresTx } from "../client";
56
import { cleanTxs } from "./cleanTxs";
67

7-
export const getSentTxs = async (): Promise<
8+
interface GetSentTxsParams {
9+
pgtx?: PrismaTransaction;
10+
}
11+
12+
export const getSentTxs = async ({ pgtx }: GetSentTxsParams = {}): Promise<
813
Static<typeof transactionResponseSchema>[]
914
> => {
15+
const prisma = getPrismaWithPostgresTx(pgtx);
16+
1017
const txs = await prisma.transactions.findMany({
1118
where: {
1219
processedAt: {

src/db/transactions/getTxById.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
import { prisma } from "../client";
1+
import { PrismaTransaction } from "../../schema/prisma";
2+
import { getPrismaWithPostgresTx } from "../client";
23
import { cleanTxs } from "./cleanTxs";
34

45
interface GetTxByIdParams {
6+
pgtx?: PrismaTransaction;
57
queueId: string;
68
}
79

8-
export const getTxById = async ({ queueId }: GetTxByIdParams) => {
10+
export const getTxById = async ({ pgtx, queueId }: GetTxByIdParams) => {
11+
const prisma = getPrismaWithPostgresTx(pgtx);
12+
913
const tx = await prisma.transactions.findUnique({
1014
where: {
1115
id: queueId,

src/db/transactions/getTxToRetry.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
import { Static } from "@sinclair/typebox";
22
import { transactionResponseSchema } from "../../../server/schemas/transaction";
3-
import { prisma } from "../client";
3+
import type { PrismaTransaction } from "../../schema/prisma";
4+
import { getPrismaWithPostgresTx } from "../client";
45
import { cleanTxs } from "./cleanTxs";
56

6-
export const getTxToRetry = async (): Promise<
7+
interface GetTxToRetryParams {
8+
pgtx?: PrismaTransaction;
9+
}
10+
11+
export const getTxToRetry = async ({ pgtx }: GetTxToRetryParams = {}): Promise<
712
Static<typeof transactionResponseSchema>[]
813
> => {
14+
const prisma = getPrismaWithPostgresTx(pgtx);
15+
916
// TODO: Why is this checking that transaction hash is not null
1017
const txs = await prisma.$queryRaw`
1118
SELECT

src/db/transactions/queueTx.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import type { DeployTransaction, Transaction } from "@thirdweb-dev/sdk";
22
import { BigNumber } from "ethers";
33
import type { ContractExtension } from "../../schema/extension";
4-
import { prisma } from "../client";
4+
import { PrismaTransaction } from "../../schema/prisma";
5+
import { getPrismaWithPostgresTx } from "../client";
56

67
interface QueueTxParams {
8+
pgtx?: PrismaTransaction;
79
tx: Transaction<any> | DeployTransaction;
810
chainId: number;
911
extension: ContractExtension;
@@ -14,12 +16,15 @@ interface QueueTxParams {
1416

1517
// TODO: Simulation should be done before this function...
1618
export const queueTx = async ({
19+
pgtx,
1720
tx,
1821
chainId,
1922
extension,
2023
deployedContractAddress,
2124
deployedContractType,
2225
}: QueueTxParams) => {
26+
const prisma = getPrismaWithPostgresTx(pgtx);
27+
2328
// TODO: SDK should have a JSON.stringify() method.
2429
const fromAddress = (await tx.getSignerAddress()).toLowerCase();
2530
const toAddress = tx.getTarget().toLowerCase();

src/db/transactions/retryTx.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
1-
import { prisma } from "../client";
1+
import { PrismaTransaction } from "../../schema/prisma";
2+
import { getPrismaWithPostgresTx } from "../client";
23

34
interface RetryTxParams {
5+
pgtx?: PrismaTransaction;
46
queueId: string;
57
maxFeePerGas: string;
68
maxPriorityFeePerGas: string;
79
}
810

9-
// TODO: Switch all functions to object params
1011
export const retryTx = async ({
12+
pgtx,
1113
queueId,
1214
maxFeePerGas,
1315
maxPriorityFeePerGas,
1416
}: RetryTxParams) => {
15-
console.log("Retrying tx...");
17+
const prisma = getPrismaWithPostgresTx(pgtx);
18+
1619
await prisma.transactions.update({
1720
where: {
1821
id: queueId,

src/db/transactions/updateTx.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import { Transactions } from "@prisma/client";
22
import { providers } from "ethers";
33
import { TransactionStatusEnum } from "../../../server/schemas/transaction";
4-
import { prisma } from "../client";
4+
import { PrismaTransaction } from "../../schema/prisma";
5+
import { getPrismaWithPostgresTx } from "../client";
56

67
interface UpdateTxParams {
8+
pgtx?: PrismaTransaction;
79
queueId: string;
810
status: TransactionStatusEnum;
911
// TODO: Receipt never actually gets used here... should get passed through.
@@ -12,11 +14,14 @@ interface UpdateTxParams {
1214
}
1315

1416
export const updateTx = async ({
17+
pgtx,
1518
queueId,
1619
status,
1720
res,
1821
txData,
1922
}: UpdateTxParams) => {
23+
const prisma = getPrismaWithPostgresTx(pgtx);
24+
2025
switch (status) {
2126
case TransactionStatusEnum.Submitted:
2227
await prisma.transactions.update({

src/db/wallets/cleanWallet.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { WalletDetails, WalletNonce } from "@prisma/client";
22

3-
type WalletNonceWithDetails = WalletNonce & {
3+
export type WalletNonceWithDetails = WalletNonce & {
44
walletDetails: WalletDetails;
55
};
66

src/db/wallets/createWalletDetails.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
import { PrismaTransaction } from "../../schema/prisma";
12
import type { WalletType } from "../../schema/wallet";
2-
import { prisma } from "../client";
3+
import { getPrismaWithPostgresTx } from "../client";
34

45
// TODO: Case on types by wallet type
56
interface CreateWalletDetailsParams {
7+
pgtx?: PrismaTransaction;
68
address: string;
79
type: WalletType;
810
awsKmsKeyId?: string;
@@ -14,9 +16,12 @@ interface CreateWalletDetailsParams {
1416
gcpKmsResourcePath?: string;
1517
}
1618

17-
export const createWalletDetails = async (
18-
walletDetails: CreateWalletDetailsParams,
19-
) => {
19+
export const createWalletDetails = async ({
20+
pgtx,
21+
...walletDetails
22+
}: CreateWalletDetailsParams) => {
23+
const prisma = getPrismaWithPostgresTx(pgtx);
24+
2025
return prisma.walletDetails.create({
2126
data: {
2227
...walletDetails,

src/db/wallets/createWalletNonce.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
import { BigNumber } from "ethers";
22
import { getSDK } from "../../../core/sdk/sdk";
33
import { getWalletNonce } from "../../../core/services/blockchain";
4-
import { prisma } from "../client";
4+
import { PrismaTransaction } from "../../schema/prisma";
5+
import { getPrismaWithPostgresTx } from "../client";
56

67
interface CreateWalletNonceParams {
8+
pgtx?: PrismaTransaction;
79
chainId: number;
810
address: string;
911
}
1012

1113
export const createWalletNonce = async ({
14+
pgtx,
1215
chainId,
1316
address,
1417
}: CreateWalletNonceParams) => {
18+
const prisma = getPrismaWithPostgresTx(pgtx);
19+
1520
// TODO: chainId instead of chainName being passed around everywhere
1621
// or just pass SDK around
1722
const sdk = await getSDK(chainId.toString());

src/db/wallets/getAllWallets.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
1-
import { prisma } from "../client";
1+
import { PrismaTransaction } from "../../schema/prisma";
2+
import { getPrismaWithPostgresTx } from "../client";
23
import { cleanWallet } from "../wallets/cleanWallet";
34

4-
export interface GetAllWalletsParams {
5-
chainId: number;
5+
interface GetAllWalletsParams {
6+
pgtx?: PrismaTransaction;
67
}
78

89
// TODO: Add error logging handler from req.log to all queries
9-
export const getAllWallets = async () => {
10+
export const getAllWallets = async ({ pgtx }: GetAllWalletsParams = {}) => {
11+
const prisma = getPrismaWithPostgresTx(pgtx);
12+
1013
const wallets = await prisma.walletDetails.findMany();
1114
return wallets;
1215
};
1316

17+
interface GetAllWalletsByChainIdParams {
18+
pgtx: PrismaTransaction;
19+
chainId: number;
20+
}
21+
22+
// TODO: Change to chainId
1423
export const getAllWalletsByChain = async ({
24+
pgtx,
1525
chainId,
16-
}: GetAllWalletsParams) => {
26+
}: GetAllWalletsByChainIdParams) => {
27+
const prisma = getPrismaWithPostgresTx(pgtx);
28+
1729
const wallets = await prisma.walletNonce.findMany({
1830
where: {
1931
chainId,

0 commit comments

Comments
 (0)