Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ use {
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
},
solana_streamer::{
quic::{spawn_server_with_cancel, QuicServerParams, SpawnServerResult},
quic::{
spawn_server_with_cancel, spawn_simple_qos_server_with_cancel, QuicServerParams,
SimpleQosQuicServerParams, SpawnServerResult,
},
streamer::StakedNodes,
},
solana_turbine::{
Expand Down Expand Up @@ -98,6 +101,9 @@ impl SigVerifier {
}
}

// Conservatively allow 20 TPS per validator.
pub const MAX_VOTES_PER_SECOND: u64 = 20;

pub struct Tpu {
fetch_stage: FetchStage,
sig_verifier: SigVerifier,
Expand Down Expand Up @@ -151,7 +157,7 @@ impl Tpu {
tpu_enable_udp: bool,
tpu_quic_server_config: QuicServerParams,
tpu_fwd_quic_server_config: QuicServerParams,
vote_quic_server_config: QuicServerParams,
vote_quic_server_config: SimpleQosQuicServerParams,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
block_production_method: BlockProductionMethod,
block_production_num_workers: NonZeroUsize,
Expand Down Expand Up @@ -212,7 +218,7 @@ impl Tpu {
endpoints: _,
thread: tpu_vote_quic_t,
key_updater: vote_streamer_key_updater,
} = spawn_server_with_cancel(
} = spawn_simple_qos_server_with_cancel(
"solQuicTVo",
"quic_streamer_tpu_vote",
tpu_vote_quic_sockets,
Expand Down
18 changes: 15 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ use {
solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
solana_shred_version::compute_shred_version,
solana_signer::Signer,
solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
solana_streamer::{
quic::{QuicServerParams, SimpleQosQuicServerParams},
socket::SocketAddrSpace,
streamer::StakedNodes,
},
solana_time_utils::timestamp,
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
Expand Down Expand Up @@ -561,7 +565,7 @@ pub struct ValidatorTpuConfig {
/// QUIC server config for TPU forward
pub tpu_fwd_quic_server_config: QuicServerParams,
/// QUIC server config for Vote
pub vote_quic_server_config: QuicServerParams,
pub vote_quic_server_config: SimpleQosQuicServerParams,
}

impl ValidatorTpuConfig {
Expand All @@ -582,7 +586,15 @@ impl ValidatorTpuConfig {
};

// vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
let vote_quic_server_config = SimpleQosQuicServerParams {
quic_server_params: QuicServerParams {
max_connections_per_ipaddr_per_min: 32,
max_unstaked_connections: 0,
coalesce_channel_size: 100_000, // smaller channel size for faster test
..Default::default()
},
..Default::default()
};

ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
Expand Down
3 changes: 3 additions & 0 deletions streamer/src/nonblocking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
pub mod connection_rate_limiter;
pub mod qos;
pub mod quic;
#[cfg(feature = "dev-context-only-utils")]
pub mod recvmmsg;
pub mod sendmmsg;
pub mod simple_qos;
mod stream_throttle;
pub mod swqos;
#[cfg(feature = "dev-context-only-utils")]
pub mod testing_utilities;
64 changes: 64 additions & 0 deletions streamer/src/nonblocking/qos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use {
crate::nonblocking::{
quic::{ClientConnectionTracker, ConnectionPeerType},
stream_throttle::ConnectionStreamCounter,
},
quinn::Connection,
std::{sync::Arc, time::Duration},
tokio_util::sync::CancellationToken,
};

/// A trait to provide context about a connection, such as peer type,
/// remote pubkey. This is opaque to the framework and is provided by
/// the concrete implementation of QosController.
pub(crate) trait ConnectionContext: Clone + Send + Sync {
fn peer_type(&self) -> ConnectionPeerType;
fn remote_pubkey(&self) -> Option<solana_pubkey::Pubkey>;
}

/// A trait to manage QoS for connections. This includes
/// 1) deriving the ConnectionContext for a connection
/// 2) managing connection caching and connection limits, stream limits
pub(crate) trait QosController<C: ConnectionContext> {
/// Derive the ConnectionContext for a connection
fn derive_connection_context(&self, connection: &Connection) -> C;

/// Try to add a new connection to cache. If successful, return a CancellationToken and
/// a ConnectionStreamCounter to track the streams created on this connection.
/// Otherwise return None.
fn try_cache_connection(
&self,
client_connection_tracker: ClientConnectionTracker,
connection: &quinn::Connection,
context: &mut C,
) -> impl std::future::Future<Output = Option<(CancellationToken, Arc<ConnectionStreamCounter>)>>
+ Send;

/// The maximum number of streams that can be opened per throttling interval
/// on this connection.
fn max_streams_per_throttling_interval(&self, context: &C) -> u64;

fn total_stake(&self) -> u64;

/// Called when a stream is accepted on a connection
fn on_stream_accepted(&self, context: &C);

/// Called when a stream is finished successfully
fn on_stream_finished(&self, context: &C);

/// Called when a stream has an error
fn on_stream_error(&self, context: &C);

/// Called when a stream is closed
fn on_stream_closed(&self, context: &C);

/// Remove a connection. Return the number of open connections after removal.
fn remove_connection(
&self,
context: &C,
connection: Connection,
) -> impl std::future::Future<Output = usize> + Send;

/// The timeout duration to wait for a chunk to arrive on a stream
fn wait_for_chunk_timeout(&self) -> Duration;
}
Loading
Loading