Skip to content

Commit f41fb84

Browse files
authored
rpc-sts: add config options for stake-weighted qos (pyth-network#197)
* rpc-sts: plumb options for swqos config * rpc-sts: send to specific tpu peers when configured
1 parent 0f8f8cd commit f41fb84

File tree

3 files changed

+57
-9
lines changed

3 files changed

+57
-9
lines changed

send-transaction-service/src/send_transaction_service.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ pub struct Config {
115115
pub batch_send_rate_ms: u64,
116116
/// When the retry pool exceeds this max size, new transactions are dropped after their first broadcast attempt
117117
pub retry_pool_max_size: usize,
118+
pub tpu_peers: Option<Vec<SocketAddr>>,
118119
}
119120

120121
impl Default for Config {
@@ -127,6 +128,7 @@ impl Default for Config {
127128
batch_size: DEFAULT_TRANSACTION_BATCH_SIZE,
128129
batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS,
129130
retry_pool_max_size: MAX_TRANSACTION_RETRY_POOL_SIZE,
131+
tpu_peers: None,
130132
}
131133
}
132134
}
@@ -566,12 +568,18 @@ impl SendTransactionService {
566568
stats: &SendTransactionServiceStats,
567569
) {
568570
// Processing the transactions in batch
569-
let addresses = Self::get_tpu_addresses_with_slots(
571+
let mut addresses = config
572+
.tpu_peers
573+
.as_ref()
574+
.map(|addrs| addrs.iter().map(|a| (a, 0)).collect::<Vec<_>>())
575+
.unwrap_or_default();
576+
let leader_addresses = Self::get_tpu_addresses_with_slots(
570577
tpu_address,
571578
leader_info,
572579
config,
573580
connection_cache.protocol(),
574581
);
582+
addresses.extend(leader_addresses);
575583

576584
let wire_transactions = transactions
577585
.iter()
@@ -584,8 +592,8 @@ impl SendTransactionService {
584592
})
585593
.collect::<Vec<&[u8]>>();
586594

587-
for address in &addresses {
588-
Self::send_transactions(address.0, &wire_transactions, connection_cache, stats);
595+
for (address, _) in &addresses {
596+
Self::send_transactions(address, &wire_transactions, connection_cache, stats);
589597
}
590598
}
591599

@@ -702,14 +710,20 @@ impl SendTransactionService {
702710

703711
let iter = wire_transactions.chunks(config.batch_size);
704712
for chunk in iter {
713+
let mut addresses = config
714+
.tpu_peers
715+
.as_ref()
716+
.map(|addrs| addrs.iter().collect::<Vec<_>>())
717+
.unwrap_or_default();
705718
let mut leader_info_provider = leader_info_provider.lock().unwrap();
706719
let leader_info = leader_info_provider.get_leader_info();
707-
let addresses = Self::get_tpu_addresses(
720+
let leader_addresses = Self::get_tpu_addresses(
708721
tpu_address,
709722
leader_info,
710723
config,
711724
connection_cache.protocol(),
712725
);
726+
addresses.extend(leader_addresses);
713727

714728
for address in &addresses {
715729
Self::send_transactions(address, chunk, connection_cache, stats);

validator/src/cli.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,22 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
11631163
.default_value(&default_args.rpc_send_transaction_retry_pool_max_size)
11641164
.help("The maximum size of transactions retry pool."),
11651165
)
1166+
.arg(
1167+
Arg::with_name("rpc_send_transaction_tpu_peer")
1168+
.long("rpc-send-transaction-tpu-peer")
1169+
.takes_value(true)
1170+
.number_of_values(1)
1171+
.multiple(true)
1172+
.value_name("HOST:PORT")
1173+
.validator(solana_net_utils::is_host_port)
1174+
.help("Peer(s) to broadcast transactions to instead of the current leader")
1175+
)
1176+
.arg(
1177+
Arg::with_name("rpc_send_transaction_also_leader")
1178+
.long("rpc-send-transaction-also-leader")
1179+
.requires("rpc_send_transaction_tpu_peer")
1180+
.help("With `--rpc-send-transaction-tpu-peer HOST:PORT`, also send to the current leader")
1181+
)
11661182
.arg(
11671183
Arg::with_name("rpc_scan_and_fix_roots")
11681184
.long("rpc-scan-and-fix-roots")

validator/src/main.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,27 @@ pub fn main() {
13081308
);
13091309
exit(1);
13101310
}
1311+
let rpc_send_transaction_tpu_peers = matches
1312+
.values_of("rpc_send_transaction_tpu_peer")
1313+
.map(|values| {
1314+
values
1315+
.map(solana_net_utils::parse_host_port)
1316+
.collect::<Result<Vec<SocketAddr>, String>>()
1317+
})
1318+
.transpose()
1319+
.unwrap_or_else(|e| {
1320+
eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}");
1321+
exit(1);
1322+
});
1323+
let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader");
1324+
let leader_forward_count =
1325+
if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader {
1326+
// rpc-sts is configured to send only to specific tpu peers. disable leader forwards
1327+
0
1328+
} else {
1329+
value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64)
1330+
};
1331+
13111332
let full_api = matches.is_present("full_rpc_api");
13121333

13131334
let mut validator_config = ValidatorConfig {
@@ -1399,11 +1420,7 @@ pub fn main() {
13991420
contact_debug_interval,
14001421
send_transaction_service_config: send_transaction_service::Config {
14011422
retry_rate_ms: rpc_send_retry_rate_ms,
1402-
leader_forward_count: value_t_or_exit!(
1403-
matches,
1404-
"rpc_send_transaction_leader_forward_count",
1405-
u64
1406-
),
1423+
leader_forward_count,
14071424
default_max_retries: value_t!(
14081425
matches,
14091426
"rpc_send_transaction_default_max_retries",
@@ -1422,6 +1439,7 @@ pub fn main() {
14221439
"rpc_send_transaction_retry_pool_max_size",
14231440
usize
14241441
),
1442+
tpu_peers: rpc_send_transaction_tpu_peers,
14251443
},
14261444
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
14271445
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),

0 commit comments

Comments
 (0)