Skip to content

Commit e6d731e

Browse files
authored
Merge pull request #1175 from input-output-hk/improve-subscription-logging
Logging improvements for network subscriptions
2 parents 6f004f5 + 28875fc commit e6d731e

File tree

2 files changed

+37
-38
lines changed

2 files changed

+37
-38
lines changed

jormungandr/src/network/subscription.rs

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use super::{
44
GlobalStateR,
55
};
66
use crate::{
7-
blockcfg::{Fragment, Header, HeaderHash},
7+
blockcfg::{Fragment, Header},
88
intercom::{BlockMsg, TransactionMsg},
99
settings::start::network::Configuration,
10-
utils::async_msg::MessageBox,
10+
utils::async_msg::{self, MessageBox},
1111
};
1212
use jormungandr_lib::interfaces::FragmentOrigin;
1313
use network_core::error as core_error;
@@ -18,7 +18,6 @@ use futures::future::{self, FutureResult};
1818
use futures::prelude::*;
1919
use slog::Logger;
2020

21-
use std::collections::HashSet;
2221
use std::fmt::Debug;
2322

2423
#[must_use = "`Subscription` needs to be plugged into a service trait implementation"]
@@ -90,26 +89,7 @@ where
9089
// Not logging the item here because start_send might refuse to send it
9190
// and it will end up logged redundantly. This won't be a problem with
9291
// futures 0.3.
93-
match self.inbound.start_send(item) {
94-
Ok(AsyncSink::Ready) => {
95-
trace!(
96-
self.logger,
97-
"item queued for processing";
98-
"direction" => "in",
99-
);
100-
Ok(AsyncSink::Ready)
101-
}
102-
Ok(AsyncSink::NotReady(item)) => Ok(AsyncSink::NotReady(item)),
103-
Err(e) => {
104-
debug!(
105-
self.logger,
106-
"failed to queue item for processing";
107-
"error" => ?e,
108-
"direction" => "in",
109-
);
110-
Err(e)
111-
}
112-
}
92+
self.inbound.start_send(item)
11393
}
11494

11595
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
@@ -192,7 +172,6 @@ pub struct BlockAnnouncementProcessor {
192172
node_id: Id,
193173
global_state: GlobalStateR,
194174
logger: Logger,
195-
seen_blocks: HashSet<HeaderHash>,
196175
}
197176

198177
impl BlockAnnouncementProcessor {
@@ -207,13 +186,24 @@ impl BlockAnnouncementProcessor {
207186
node_id,
208187
global_state,
209188
logger,
210-
seen_blocks: HashSet::new(),
211189
}
212190
}
213191

214192
pub fn message_box(&self) -> MessageBox<BlockMsg> {
215193
self.mbox.clone()
216194
}
195+
196+
fn mbox_error<T>(&self, err: async_msg::SendError<T>) -> core_error::Error
197+
where
198+
T: Send + Sync + 'static,
199+
{
200+
error!(
201+
self.logger,
202+
"failed to send block announcement to the block task";
203+
"reason" => %err,
204+
);
205+
core_error::Error::new(core_error::Code::Internal, err)
206+
}
217207
}
218208

219209
#[must_use = "sinks do nothing unless polled"]
@@ -282,25 +272,18 @@ impl Sink for BlockAnnouncementProcessor {
282272
type SinkError = core_error::Error;
283273

284274
fn start_send(&mut self, header: Header) -> StartSend<Header, core_error::Error> {
285-
let block_hash = header.hash();
286-
if self.seen_blocks.insert(block_hash) {
287-
info!(self.logger, "received block announcement"; "hash" => %block_hash);
275+
let polled_ready = self.mbox.poll_ready().map_err(|e| self.mbox_error(e))?;
276+
if polled_ready.is_not_ready() {
277+
return Ok(AsyncSink::NotReady(header));
288278
}
279+
let block_hash = header.hash();
280+
info!(self.logger, "received block announcement"; "hash" => %block_hash);
289281
let polled = self
290282
.mbox
291283
.start_send(BlockMsg::AnnouncedBlock(header, self.node_id))
292-
.map_err(|e| {
293-
error!(
294-
self.logger,
295-
"failed to send block announcement to the block task";
296-
"reason" => %e,
297-
);
298-
self.seen_blocks.remove(&block_hash);
299-
core_error::Error::new(core_error::Code::Internal, e)
300-
})?;
284+
.map_err(|e| self.mbox_error(e))?;
301285
match polled {
302286
AsyncSink::Ready => {
303-
self.seen_blocks.remove(&block_hash);
304287
self.global_state.peers.refresh_peer_on_block(self.node_id);
305288
Ok(AsyncSink::Ready)
306289
}
@@ -342,6 +325,11 @@ impl Sink for FragmentProcessor {
342325
if self.buffered_fragments.len() >= buffer_sizes::FRAGMENTS {
343326
return Ok(AsyncSink::NotReady(fragment));
344327
}
328+
trace!(
329+
self.logger,
330+
"received";
331+
"item" => ?fragment,
332+
);
345333
self.buffered_fragments.push(fragment);
346334
let async_send = self.try_send_fragments()?;
347335
Ok(async_send.map(|()| self.buffered_fragments.pop().unwrap()))
@@ -418,6 +406,11 @@ impl Sink for GossipProcessor {
418406
&mut self,
419407
gossip: Gossip<NodeData>,
420408
) -> StartSend<Self::SinkItem, core_error::Error> {
409+
trace!(
410+
self.logger,
411+
"received";
412+
"item" => ?gossip,
413+
);
421414
self.process_item(gossip);
422415
Ok(AsyncSink::Ready)
423416
}

jormungandr/src/utils/async_msg.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ impl<Msg> MessageBox<Msg> {
3636
pub fn try_send(&mut self, a: Msg) -> Result<(), TrySendError<Msg>> {
3737
self.0.try_send(a)
3838
}
39+
40+
/// Polls the channel to determine if there is guaranteed to be capacity
41+
/// to send at least one item without waiting.
42+
pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> {
43+
self.0.poll_ready()
44+
}
3945
}
4046

4147
impl<Msg> Sink for MessageBox<Msg> {

0 commit comments

Comments
 (0)