Skip to content

Commit e8d1f79

Browse files
feat: chunk broadcasting support (#2533)
* feat: chunk broadcasting support * chore: prettier lint * fix: non-chunking support * chore: bump version --------- Co-authored-by: Ali Behjati <bahjatia@gmail.com>
1 parent d66907f commit e8d1f79

File tree

5 files changed

+398
-340
lines changed

5 files changed

+398
-340
lines changed

apps/price_pusher/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ pnpm run start injective --grpc-endpoint https://grpc-endpoint.com \
114114
--network testnet \
115115
[--gas-price 160000000] \
116116
[--gas-multiplier 1.1] \
117+
[--priceIds-process-chunk-size 100] \
117118
[--pushing-frequency 10] \
118119
[--polling-frequency 5]
119120

apps/price_pusher/package.json

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@pythnetwork/price-pusher",
3-
"version": "9.1.5",
3+
"version": "9.2.0",
44
"description": "Pyth Price Pusher",
55
"homepage": "https://pyth.network",
66
"main": "lib/index.js",
@@ -29,7 +29,7 @@
2929
"dev": "ts-node src/index.ts",
3030
"prepublishOnly": "pnpm run build && pnpm run test:lint",
3131
"preversion": "pnpm run test:lint",
32-
"version": "pnpm run format && git add -A src"
32+
"version": "pnpm run test:format && pnpm run test:lint && git add -A src"
3333
},
3434
"keywords": [
3535
"pyth",
@@ -60,8 +60,9 @@
6060
},
6161
"dependencies": {
6262
"@coral-xyz/anchor": "^0.30.0",
63-
"@injectivelabs/networks": "^1.14.6",
64-
"@injectivelabs/sdk-ts": "1.10.72",
63+
"@injectivelabs/networks": "1.14.47",
64+
"@injectivelabs/utils": "^1.14.47",
65+
"@injectivelabs/sdk-ts": "1.14.49",
6566
"@mysten/sui": "^1.3.0",
6667
"@pythnetwork/hermes-client": "^1.3.1",
6768
"@pythnetwork/price-service-sdk": "workspace:^",

apps/price_pusher/src/injective/command.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,19 @@ export default {
2727
required: true,
2828
} as Options,
2929
"gas-price": {
30-
description: "Gas price to be used for each transasction",
30+
description: "Gas price to be used for each transaction",
3131
type: "number",
3232
} as Options,
3333
"gas-multiplier": {
34-
description: "Gas multiplier to be used for each transasction",
34+
description: "Gas multiplier to be used for each transaction",
3535
type: "number",
3636
} as Options,
37+
"price-ids-process-chunk-size": {
38+
description:
39+
"Set in case we wanna split price feeds updates into chunks to have smaller transactions. Set to -1 to disable chunking.",
40+
type: "number",
41+
required: false,
42+
} as Options,
3743
...options.priceConfigFile,
3844
...options.priceServiceEndpoint,
3945
...options.mnemonicFile,
@@ -46,18 +52,19 @@ export default {
4652
handler: async function (argv: any) {
4753
// FIXME: type checks for this
4854
const {
55+
network,
56+
logLevel,
4957
gasPrice,
50-
gasMultiplier,
5158
grpcEndpoint,
52-
priceConfigFile,
53-
priceServiceEndpoint,
5459
mnemonicFile,
55-
pythContractAddress,
56-
pushingFrequency,
60+
gasMultiplier,
61+
priceConfigFile,
5762
pollingFrequency,
58-
network,
59-
logLevel,
63+
pushingFrequency,
6064
controllerLogLevel,
65+
pythContractAddress,
66+
priceServiceEndpoint,
67+
priceIdsProcessChunkSize,
6168
} = argv;
6269

6370
const logger = pino({ level: logLevel });
@@ -111,6 +118,7 @@ export default {
111118
chainId: getNetworkInfo(network).chainId,
112119
gasPrice,
113120
gasMultiplier,
121+
priceIdsProcessChunkSize,
114122
},
115123
);
116124

apps/price_pusher/src/injective/injective.ts

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,22 @@ import {
77
} from "../interface";
88
import { DurationInSeconds } from "../utils";
99
import {
10+
Msgs,
11+
Account,
12+
TxResponse,
13+
PrivateKey,
14+
TxGrpcApi,
1015
ChainGrpcAuthApi,
1116
ChainGrpcWasmApi,
1217
MsgExecuteContract,
13-
Msgs,
14-
PrivateKey,
15-
TxGrpcClient,
16-
TxResponse,
1718
createTransactionFromMsg,
1819
} from "@injectivelabs/sdk-ts";
20+
import { splitArrayToChunks } from "@injectivelabs/utils";
1921
import { Logger } from "pino";
20-
import { Account } from "@injectivelabs/sdk-ts/dist/cjs/client/chain/types/auth";
2122

2223
const DEFAULT_GAS_PRICE = 160000000;
2324
const DEFAULT_GAS_MULTIPLIER = 1.05;
25+
const DEFAULT_PRICE_IDS_PROCESS_CHUNK_SIZE = -1;
2426
const INJECTIVE_TESTNET_CHAIN_ID = "injective-888";
2527

2628
type PriceQueryResponse = {
@@ -90,6 +92,7 @@ type InjectiveConfig = {
9092
chainId: string;
9193
gasMultiplier: number;
9294
gasPrice: number;
95+
priceIdsProcessChunkSize: number;
9396
};
9497
export class InjectivePricePusher implements IPricePusher {
9598
private wallet: PrivateKey;
@@ -110,6 +113,9 @@ export class InjectivePricePusher implements IPricePusher {
110113
chainId: chainConfig?.chainId ?? INJECTIVE_TESTNET_CHAIN_ID,
111114
gasMultiplier: chainConfig?.gasMultiplier ?? DEFAULT_GAS_MULTIPLIER,
112115
gasPrice: chainConfig?.gasPrice ?? DEFAULT_GAS_PRICE,
116+
priceIdsProcessChunkSize:
117+
chainConfig?.priceIdsProcessChunkSize ??
118+
DEFAULT_PRICE_IDS_PROCESS_CHUNK_SIZE,
113119
};
114120
}
115121

@@ -119,6 +125,7 @@ export class InjectivePricePusher implements IPricePusher {
119125

120126
private async signAndBroadcastMsg(msg: Msgs): Promise<TxResponse> {
121127
const chainGrpcAuthApi = new ChainGrpcAuthApi(this.grpcEndpoint);
128+
122129
// Fetch the latest account details only if it's not stored.
123130
this.account ??= await chainGrpcAuthApi.fetchAccount(
124131
this.injectiveAddress(),
@@ -132,7 +139,7 @@ export class InjectivePricePusher implements IPricePusher {
132139
pubKey: this.wallet.toPublicKey().toBase64(),
133140
});
134141

135-
const txService = new TxGrpcClient(this.grpcEndpoint);
142+
const txService = new TxGrpcApi(this.grpcEndpoint);
136143
// simulation
137144
try {
138145
const {
@@ -207,12 +214,33 @@ export class InjectivePricePusher implements IPricePusher {
207214
if (priceIds.length !== pubTimesToPush.length)
208215
throw new Error("Invalid arguments");
209216

217+
const priceIdChunks =
218+
this.chainConfig.priceIdsProcessChunkSize === -1
219+
? [priceIds]
220+
: splitArrayToChunks({
221+
array: priceIds,
222+
chunkSize: this.chainConfig.priceIdsProcessChunkSize,
223+
});
224+
225+
for (const [chunkIndex, priceIdChunk] of priceIdChunks.entries()) {
226+
await this.updatePriceFeedChunk(priceIdChunk, chunkIndex);
227+
}
228+
}
229+
230+
private async updatePriceFeedChunk(
231+
priceIds: string[],
232+
chunkIndex: number,
233+
): Promise<void> {
210234
let priceFeedUpdateObject;
235+
211236
try {
212237
// get the latest VAAs for updatePriceFeed and then push them
213238
priceFeedUpdateObject = await this.getPriceFeedUpdateObject(priceIds);
214239
} catch (err) {
215-
this.logger.error(err, "Error fetching the latest vaas to push");
240+
this.logger.error(
241+
err,
242+
`Error fetching the latest vaas to push for chunk ${chunkIndex}`,
243+
);
216244
return;
217245
}
218246

@@ -233,7 +261,10 @@ export class InjectivePricePusher implements IPricePusher {
233261
const json = Buffer.from(data).toString();
234262
updateFeeQueryResponse = JSON.parse(json);
235263
} catch (err) {
236-
this.logger.error(err, "Error fetching update fee");
264+
this.logger.error(
265+
err,
266+
`Error fetching update fee for chunk ${chunkIndex}`,
267+
);
237268
// Throwing an error because it is likely an RPC issue
238269
throw err;
239270
}
@@ -247,21 +278,27 @@ export class InjectivePricePusher implements IPricePusher {
247278
});
248279

249280
const rs = await this.signAndBroadcastMsg(executeMsg);
250-
this.logger.info({ hash: rs.txHash }, "Succesfully broadcasted txHash");
281+
this.logger.info(
282+
{ hash: rs.txHash },
283+
`Successfully broadcasted txHash for chunk ${chunkIndex}`,
284+
);
251285
} catch (err: any) {
252286
if (err.message.match(/account inj[a-zA-Z0-9]+ not found/) !== null) {
253-
this.logger.error(err, "Account not found");
287+
this.logger.error(err, `Account not found for chunk ${chunkIndex}`);
254288
throw new Error("Please check the mnemonic");
255289
}
256290

257291
if (
258292
err.message.match(/insufficient/) !== null &&
259293
err.message.match(/funds/) !== null
260294
) {
261-
this.logger.error(err, "Insufficient funds");
295+
this.logger.error(err, `Insufficient funds for chunk ${chunkIndex}`);
262296
throw new Error("Insufficient funds");
263297
}
264-
this.logger.error(err, "Error executing messages");
298+
this.logger.error(
299+
err,
300+
`Error executing messages for chunk ${chunkIndex}`,
301+
);
265302
}
266303
}
267304
}

0 commit comments

Comments
 (0)