Skip to content

ln: om: add option to internally queue forwards to offline peers #3765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 69 additions & 3 deletions lightning/src/onion_message/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ pub struct OnionMessenger<
dns_resolver_handler: DRH,
custom_handler: CMH,
intercept_messages_for_offline_peers: bool,
queue_messages_for_offline_peers: bool,
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
pending_peer_connected_events: Mutex<Vec<Event>>,
pending_events_processor: AtomicBool,
Expand Down Expand Up @@ -1233,6 +1234,8 @@ where
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
) -> Self {
let intercept_messages_for_offline_peers = false;
let queue_messages_for_offline_peers = false;
Self::new_inner(
entropy_source,
node_signer,
Expand All @@ -1243,7 +1246,8 @@ where
async_payments_handler,
dns_resolver,
custom_handler,
false,
intercept_messages_for_offline_peers,
queue_messages_for_offline_peers,
)
}

Expand Down Expand Up @@ -1272,6 +1276,8 @@ where
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
) -> Self {
let intercept_messages_for_offline_peers = true;
let queue_messages_for_offline_peers = false;
Self::new_inner(
entropy_source,
node_signer,
Expand All @@ -1282,15 +1288,48 @@ where
async_payments_handler,
dns_resolver,
custom_handler,
true,
intercept_messages_for_offline_peers,
queue_messages_for_offline_peers,
)
}

/// Similar to [`Self::new`], but rather than dropping onion messages that
/// are intended to be forwarded to offline peers, we'll queue them until
/// the peer connects or two timer ticks pass.
///
/// A single [`Event::ConnectionNeeded`] event with just the NodeId and no
/// addresses will be generated once a message queues for an offline peer.
pub fn new_with_offline_peer_queueing(
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
) -> Self {
let intercept_messages_for_offline_peers = false;
let queue_messages_for_offline_peers = true;
Self::new_inner(
entropy_source,
node_signer,
logger,
node_id_lookup,
message_router,
offers_handler,
async_payments_handler,
dns_resolver,
custom_handler,
intercept_messages_for_offline_peers,
queue_messages_for_offline_peers,
)
}

fn new_inner(
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
intercept_messages_for_offline_peers: bool,
intercept_messages_for_offline_peers: bool, queue_messages_for_offline_peers: bool,
) -> Self {
debug_assert!(
!(intercept_messages_for_offline_peers && queue_messages_for_offline_peers),
"Can't generate intercept events and queue messages for offline peers at the same time",
);

let mut secp_ctx = Secp256k1::new();
secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
OnionMessenger {
Expand All @@ -1306,6 +1345,7 @@ where
dns_resolver_handler: dns_resolver,
custom_handler,
intercept_messages_for_offline_peers,
queue_messages_for_offline_peers,
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
pending_peer_connected_events: Mutex::new(Vec::new()),
pending_events_processor: AtomicBool::new(false),
Expand Down Expand Up @@ -2023,6 +2063,32 @@ where
.entry(next_node_id)
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()));

// When enabled, we'll queue all messages, even for offline peers
// and peers pending connection.
if self.queue_messages_for_offline_peers {
match message_recipients.entry(next_node_id) {
hash_map::Entry::Occupied(mut e) => {
log_trace!(logger, "Forwarding onion message peer {next_node_id}");
e.get_mut().enqueue_message(onion_message);
},
hash_map::Entry::Vacant(e) => {
log_trace!(
logger,
"Forwarding onion message to disconnected peer {next_node_id}: \
awaiting connection"
);
let addrs = Vec::new();
e.insert(OnionMessageRecipient::pending_connection(addrs))
.enqueue_message(onion_message);
// Notify the background processor that we need to
// connect to this peer.
self.event_notifier.notify();
},
};
return;
}

// Otherwise, only forward to connected peers.
match message_recipients.entry(next_node_id) {
hash_map::Entry::Occupied(mut e)
if matches!(e.get(), OnionMessageRecipient::ConnectedPeer(..)) =>
Expand Down
Loading