Skip to content

Commit 4a67cd4

Browse files
authored
Allow configuration of replay thread pools from CLI (pyth-network#236)
Bubble up the constants to the CLI that control the sizes of the following two thread pools: - The thread pool used to replay multiple forks in parallel - The thread pool used to execute transactions in parallel
1 parent 09ae587 commit 4a67cd4

File tree

7 files changed

+180
-25
lines changed

7 files changed

+180
-25
lines changed

core/src/replay_stage.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ use {
5151
solana_measure::measure::Measure,
5252
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
5353
solana_program_runtime::timings::ExecuteTimings,
54-
solana_rayon_threadlimit::get_max_thread_count,
5554
solana_rpc::{
5655
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
5756
rpc_subscriptions::RpcSubscriptions,
@@ -80,6 +79,7 @@ use {
8079
solana_vote_program::vote_state::VoteTransaction,
8180
std::{
8281
collections::{HashMap, HashSet},
82+
num::NonZeroUsize,
8383
result,
8484
sync::{
8585
atomic::{AtomicBool, AtomicU64, Ordering},
@@ -95,11 +95,9 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
9595
pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
9696
pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
9797
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;
98+
9899
const MAX_VOTE_SIGNATURES: usize = 200;
99100
const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
100-
// Expect this number to be small enough to minimize thread pool overhead while large enough
101-
// to be able to replay all active forks at the same time in most cases.
102-
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
103101
const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10;
104102

105103
#[derive(PartialEq, Eq, Debug)]
@@ -291,7 +289,8 @@ pub struct ReplayStageConfig {
291289
// Stops voting until this slot has been reached. Should be used to avoid
292290
// duplicate voting which can lead to slashing.
293291
pub wait_to_vote_slot: Option<Slot>,
294-
pub replay_slots_concurrently: bool,
292+
pub replay_forks_threads: NonZeroUsize,
293+
pub replay_transactions_threads: NonZeroUsize,
295294
}
296295

297296
/// Timing information for the ReplayStage main processing loop
@@ -574,7 +573,8 @@ impl ReplayStage {
574573
ancestor_hashes_replay_update_sender,
575574
tower_storage,
576575
wait_to_vote_slot,
577-
replay_slots_concurrently,
576+
replay_forks_threads,
577+
replay_transactions_threads,
578578
} = config;
579579

580580
trace!("replay stage");
@@ -654,19 +654,19 @@ impl ReplayStage {
654654
)
655655
};
656656
// Thread pool to (maybe) replay multiple threads in parallel
657-
let replay_mode = if replay_slots_concurrently {
657+
let replay_mode = if replay_forks_threads.get() == 1 {
658+
ForkReplayMode::Serial
659+
} else {
658660
let pool = rayon::ThreadPoolBuilder::new()
659-
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
661+
.num_threads(replay_forks_threads.get())
660662
.thread_name(|i| format!("solReplayFork{i:02}"))
661663
.build()
662664
.expect("new rayon threadpool");
663665
ForkReplayMode::Parallel(pool)
664-
} else {
665-
ForkReplayMode::Serial
666666
};
667667
// Thread pool to replay multiple transactions within one block in parallel
668668
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
669-
.num_threads(get_max_thread_count())
669+
.num_threads(replay_transactions_threads.get())
670670
.thread_name(|i| format!("solReplayTx{i:02}"))
671671
.build()
672672
.expect("new rayon threadpool");

core/src/tvu.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use {
5353
std::{
5454
collections::HashSet,
5555
net::{SocketAddr, UdpSocket},
56+
num::NonZeroUsize,
5657
sync::{atomic::AtomicBool, Arc, RwLock},
5758
thread::{self, JoinHandle},
5859
},
@@ -81,7 +82,6 @@ pub struct TvuSockets {
8182
pub ancestor_hashes_requests: UdpSocket,
8283
}
8384

84-
#[derive(Default)]
8585
pub struct TvuConfig {
8686
pub max_ledger_shreds: Option<u64>,
8787
pub shred_version: u16,
@@ -90,7 +90,22 @@ pub struct TvuConfig {
9090
// Validators which should be given priority when serving repairs
9191
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
9292
pub wait_for_vote_to_start_leader: bool,
93-
pub replay_slots_concurrently: bool,
93+
pub replay_forks_threads: NonZeroUsize,
94+
pub replay_transactions_threads: NonZeroUsize,
95+
}
96+
97+
impl Default for TvuConfig {
98+
fn default() -> Self {
99+
Self {
100+
max_ledger_shreds: None,
101+
shred_version: 0,
102+
repair_validators: None,
103+
repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
104+
wait_for_vote_to_start_leader: false,
105+
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
106+
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
107+
}
108+
}
94109
}
95110

96111
impl Tvu {
@@ -265,7 +280,8 @@ impl Tvu {
265280
ancestor_hashes_replay_update_sender,
266281
tower_storage: tower_storage.clone(),
267282
wait_to_vote_slot,
268-
replay_slots_concurrently: tvu_config.replay_slots_concurrently,
283+
replay_forks_threads: tvu_config.replay_forks_threads,
284+
replay_transactions_threads: tvu_config.replay_transactions_threads,
269285
};
270286

271287
let (voting_sender, voting_receiver) = unbounded();

core/src/validator.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ use {
7474
poh_service::{self, PohService},
7575
},
7676
solana_program_runtime::runtime_config::RuntimeConfig,
77+
solana_rayon_threadlimit::get_max_thread_count,
7778
solana_rpc::{
7879
max_slots::MaxSlots,
7980
optimistically_confirmed_bank_tracker::{
@@ -123,6 +124,7 @@ use {
123124
std::{
124125
collections::{HashMap, HashSet},
125126
net::SocketAddr,
127+
num::NonZeroUsize,
126128
path::{Path, PathBuf},
127129
sync::{
128130
atomic::{AtomicBool, AtomicU64, Ordering},
@@ -260,14 +262,15 @@ pub struct ValidatorConfig {
260262
pub wait_to_vote_slot: Option<Slot>,
261263
pub ledger_column_options: LedgerColumnOptions,
262264
pub runtime_config: RuntimeConfig,
263-
pub replay_slots_concurrently: bool,
264265
pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
265266
pub block_verification_method: BlockVerificationMethod,
266267
pub block_production_method: BlockProductionMethod,
267268
pub generator_config: Option<GeneratorConfig>,
268269
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
269270
pub wen_restart_proto_path: Option<PathBuf>,
270271
pub unified_scheduler_handler_threads: Option<usize>,
272+
pub replay_forks_threads: NonZeroUsize,
273+
pub replay_transactions_threads: NonZeroUsize,
271274
}
272275

273276
impl Default for ValidatorConfig {
@@ -328,14 +331,15 @@ impl Default for ValidatorConfig {
328331
wait_to_vote_slot: None,
329332
ledger_column_options: LedgerColumnOptions::default(),
330333
runtime_config: RuntimeConfig::default(),
331-
replay_slots_concurrently: false,
332334
banking_trace_dir_byte_limit: 0,
333335
block_verification_method: BlockVerificationMethod::default(),
334336
block_production_method: BlockProductionMethod::default(),
335337
generator_config: None,
336338
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
337339
wen_restart_proto_path: None,
338340
unified_scheduler_handler_threads: None,
341+
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
342+
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
339343
}
340344
}
341345
}
@@ -346,6 +350,9 @@ impl ValidatorConfig {
346350
enforce_ulimit_nofile: false,
347351
rpc_config: JsonRpcConfig::default_for_test(),
348352
block_production_method: BlockProductionMethod::ThreadLocalMultiIterator,
353+
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
354+
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
355+
.expect("thread count is non-zero"),
349356
..Self::default()
350357
}
351358
}
@@ -1305,7 +1312,8 @@ impl Validator {
13051312
repair_validators: config.repair_validators.clone(),
13061313
repair_whitelist: config.repair_whitelist.clone(),
13071314
wait_for_vote_to_start_leader,
1308-
replay_slots_concurrently: config.replay_slots_concurrently,
1315+
replay_forks_threads: config.replay_forks_threads,
1316+
replay_transactions_threads: config.replay_transactions_threads,
13091317
},
13101318
&max_slots,
13111319
block_metadata_notifier,

local-cluster/src/validator_configs.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,15 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
6161
wait_to_vote_slot: config.wait_to_vote_slot,
6262
ledger_column_options: config.ledger_column_options.clone(),
6363
runtime_config: config.runtime_config.clone(),
64-
replay_slots_concurrently: config.replay_slots_concurrently,
6564
banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit,
6665
block_verification_method: config.block_verification_method.clone(),
6766
block_production_method: config.block_production_method.clone(),
6867
generator_config: config.generator_config.clone(),
6968
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
7069
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
7170
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
71+
replay_forks_threads: config.replay_forks_threads,
72+
replay_transactions_threads: config.replay_transactions_threads,
7273
}
7374
}
7475

validator/src/cli.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ use {
5252
std::{path::PathBuf, str::FromStr},
5353
};
5454

55+
pub mod thread_args;
56+
use thread_args::{thread_args, DefaultThreadArgs};
57+
5558
const EXCLUDE_KEY: &str = "account-index-exclude-key";
5659
const INCLUDE_KEY: &str = "account-index-include-key";
5760
// The default minimal snapshot download speed (bytes/second)
@@ -1466,11 +1469,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
14661469
.value_name("BYTES")
14671470
.help("Maximum number of bytes written to the program log before truncation"),
14681471
)
1469-
.arg(
1470-
Arg::with_name("replay_slots_concurrently")
1471-
.long("replay-slots-concurrently")
1472-
.help("Allow concurrent replay of slots on different forks"),
1473-
)
14741472
.arg(
14751473
Arg::with_name("banking_trace_dir_byte_limit")
14761474
// expose friendly alternative name to cli than internal
@@ -1555,6 +1553,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
15551553
",
15561554
),
15571555
)
1556+
.args(&thread_args(&default_args.thread_args))
15581557
.args(&get_deprecated_arguments())
15591558
.after_help("The default subcommand is run")
15601559
.subcommand(
@@ -2073,6 +2072,13 @@ fn deprecated_arguments() -> Vec<DeprecatedArg> {
20732072
.long("no-rocksdb-compaction")
20742073
.takes_value(false)
20752074
.help("Disable manual compaction of the ledger database"));
2075+
add_arg!(
2076+
Arg::with_name("replay_slots_concurrently")
2077+
.long("replay-slots-concurrently")
2078+
.help("Allow concurrent replay of slots on different forks")
2079+
.conflicts_with("replay_forks_threads"),
2080+
replaced_by: "replay_forks_threads",
2081+
usage_warning: "Equivalent behavior to this flag would be --replay-forks-threads 4");
20762082
add_arg!(Arg::with_name("rocksdb_compaction_interval")
20772083
.long("rocksdb-compaction-interval-slots")
20782084
.value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS")
@@ -2195,6 +2201,8 @@ pub struct DefaultArgs {
21952201
pub banking_trace_dir_byte_limit: String,
21962202

21972203
pub wen_restart_path: String,
2204+
2205+
pub thread_args: DefaultThreadArgs,
21982206
}
21992207

22002208
impl DefaultArgs {
@@ -2277,6 +2285,7 @@ impl DefaultArgs {
22772285
wait_for_restart_window_max_delinquent_stake: "5".to_string(),
22782286
banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(),
22792287
wen_restart_path: "wen_restart_progress.proto".to_string(),
2288+
thread_args: DefaultThreadArgs::default(),
22802289
}
22812290
}
22822291
}

validator/src/cli/thread_args.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
//! Arguments for controlling the number of threads allocated for various tasks
2+
3+
use {
4+
clap::{value_t_or_exit, Arg, ArgMatches},
5+
solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
6+
solana_rayon_threadlimit::get_max_thread_count,
7+
std::{num::NonZeroUsize, ops::RangeInclusive},
8+
};
9+
10+
// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
11+
pub struct DefaultThreadArgs {
12+
pub replay_forks_threads: String,
13+
pub replay_transactions_threads: String,
14+
}
15+
16+
impl Default for DefaultThreadArgs {
17+
fn default() -> Self {
18+
Self {
19+
replay_forks_threads: ReplayForksThreadsArg::default().to_string(),
20+
replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(),
21+
}
22+
}
23+
}
24+
25+
pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
26+
vec![
27+
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
28+
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
29+
]
30+
}
31+
32+
fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
33+
Arg::with_name(T::NAME)
34+
.long(T::LONG_NAME)
35+
.takes_value(true)
36+
.value_name("NUMBER")
37+
.default_value(default)
38+
.validator(|num| is_within_range(num, T::range()))
39+
.hidden(hidden_unless_forced())
40+
.help(T::HELP)
41+
}
42+
43+
pub struct NumThreadConfig {
44+
pub replay_forks_threads: NonZeroUsize,
45+
pub replay_transactions_threads: NonZeroUsize,
46+
}
47+
48+
pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
49+
NumThreadConfig {
50+
replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
51+
NonZeroUsize::new(4).expect("4 is non-zero")
52+
} else {
53+
value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize)
54+
},
55+
replay_transactions_threads: value_t_or_exit!(
56+
matches,
57+
ReplayTransactionsThreadsArg::NAME,
58+
NonZeroUsize
59+
),
60+
}
61+
}
62+
63+
/// Configuration for CLAP arguments that control the number of threads for various functions
64+
trait ThreadArg {
65+
/// The argument's name
66+
const NAME: &'static str;
67+
/// The argument's long name
68+
const LONG_NAME: &'static str;
69+
/// The argument's help message
70+
const HELP: &'static str;
71+
72+
/// The default number of threads
73+
fn default() -> usize;
74+
/// The minimum allowed number of threads (inclusive)
75+
fn min() -> usize {
76+
1
77+
}
78+
/// The maximum allowed number of threads (inclusive)
79+
fn max() -> usize {
80+
// By default, no thread pool should scale over the number of the machine's threads
81+
get_max_thread_count()
82+
}
83+
/// The range of allowed number of threads (inclusive on both ends)
84+
fn range() -> RangeInclusive<usize> {
85+
RangeInclusive::new(Self::min(), Self::max())
86+
}
87+
}
88+
89+
struct ReplayForksThreadsArg;
90+
impl ThreadArg for ReplayForksThreadsArg {
91+
const NAME: &'static str = "replay_forks_threads";
92+
const LONG_NAME: &'static str = "replay-forks-threads";
93+
const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";
94+
95+
fn default() -> usize {
96+
// Default to single threaded fork execution
97+
1
98+
}
99+
fn max() -> usize {
100+
// Choose a value that is small enough to limit the overhead of having a large thread pool
101+
// while also being large enough to allow replay of all active forks in most scenarios
102+
4
103+
}
104+
}
105+
106+
struct ReplayTransactionsThreadsArg;
107+
impl ThreadArg for ReplayTransactionsThreadsArg {
108+
const NAME: &'static str = "replay_transactions_threads";
109+
const LONG_NAME: &'static str = "replay-transactions-threads";
110+
const HELP: &'static str = "Number of threads to use for transaction replay";
111+
112+
fn default() -> usize {
113+
get_max_thread_count()
114+
}
115+
}

0 commit comments

Comments
 (0)