Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 67 additions & 42 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,10 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
dag_state: Arc<RwLock<DagState>>,
store: Arc<dyn Store>,
) -> Self {
// Set the subscribed peers by default to 0
for (_, authority) in context.committee.authorities() {
context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[authority.hostname.as_str()])
.set(0);
}

let subscription_counter = Arc::new(SubscriptionCounter::new(core_dispatcher.clone()));
let subscription_counter = Arc::new(SubscriptionCounter::new(
context.clone(),
core_dispatcher.clone(),
));
Self {
context,
block_verifier,
Expand Down Expand Up @@ -254,7 +247,6 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
);

let broadcasted_blocks = BroadcastedBlockStream::new(
self.context.clone(),
peer,
self.rx_block_broadcaster.resubscribe(),
self.subscription_counter.clone(),
Expand Down Expand Up @@ -418,36 +410,78 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
}
}

struct Counter {
count: usize,
subscriptions_by_authority: Vec<usize>,
}

/// Atomically counts the number of active subscriptions to the block broadcast stream,
/// and dispatch commands to core based on the changes.
struct SubscriptionCounter {
counter: parking_lot::Mutex<usize>,
context: Arc<Context>,
counter: parking_lot::Mutex<Counter>,
dispatcher: Arc<dyn CoreThreadDispatcher>,
}

impl SubscriptionCounter {
fn new(dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
// Set the subscribed peers by default to 0
for (_, authority) in context.committee.authorities() {
context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[authority.hostname.as_str()])
.set(0);
}

Self {
counter: parking_lot::Mutex::new(0),
counter: parking_lot::Mutex::new(Counter {
count: 0,
subscriptions_by_authority: vec![0; context.committee.size()],
}),
dispatcher,
context,
}
}

fn increment(&self) -> Result<(), ConsensusError> {
fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
let mut counter = self.counter.lock();
*counter += 1;
if *counter == 1 {
counter.count += 1;
counter.subscriptions_by_authority[peer] += 1;

let peer_hostname = &self.context.committee.authority(peer).hostname;
self.context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(1);

if counter.count == 1 {
self.dispatcher
.set_consumer_availability(true)
.map_err(|_| ConsensusError::Shutdown)?;
}
Ok(())
}

fn decrement(&self) -> Result<(), ConsensusError> {
fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
let mut counter = self.counter.lock();
*counter -= 1;
if *counter == 0 {
counter.count -= 1;
counter.subscriptions_by_authority[peer] -= 1;

if counter.subscriptions_by_authority[peer] == 0 {
let peer_hostname = &self.context.committee.authority(peer).hostname;
self.context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(0);
}

if counter.count == 0 {
self.dispatcher
.set_consumer_availability(false)
.map_err(|_| ConsensusError::Shutdown)?;
Expand All @@ -463,7 +497,6 @@ type BroadcastedBlockStream = BroadcastStream<VerifiedBlock>;
/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
/// this tolerates lags with only logging, without yielding errors.
struct BroadcastStream<T> {
context: Arc<Context>,
peer: AuthorityIndex,
// Stores the receiver across poll_next() calls.
inner: ReusableBoxFuture<
Expand All @@ -479,22 +512,17 @@ struct BroadcastStream<T> {

impl<T: 'static + Clone + Send> BroadcastStream<T> {
pub fn new(
context: Arc<Context>,
peer: AuthorityIndex,
rx: broadcast::Receiver<T>,
subscription_counter: Arc<SubscriptionCounter>,
) -> Self {
let peer_hostname = &context.committee.authority(peer).hostname;
context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(1);
// Failure can only be due to core shutdown.
let _ = subscription_counter.increment();
if let Err(err) = subscription_counter.increment(peer) {
match err {
ConsensusError::Shutdown => {}
_ => panic!("Unexpected error: {err}"),
}
}
Self {
context,
peer,
inner: ReusableBoxFuture::new(make_recv_future(rx)),
subscription_counter,
Expand Down Expand Up @@ -535,15 +563,12 @@ impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {

impl<T> Drop for BroadcastStream<T> {
fn drop(&mut self) {
let peer_hostname = &self.context.committee.authority(self.peer).hostname;
self.context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(0);
// Failure can only be due to core shutdown.
let _ = self.subscription_counter.decrement();
if let Err(err) = self.subscription_counter.decrement(self.peer) {
match err {
ConsensusError::Shutdown => {}
_ => panic!("Unexpected error: {err}"),
}
}
}
}

Expand Down
Loading