diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 6009a276976..95d77c20107 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -318,6 +318,7 @@ pub struct OnionMessenger< dns_resolver_handler: DRH, custom_handler: CMH, intercept_messages_for_offline_peers: bool, + queue_messages_for_offline_peers: bool, pending_intercepted_msgs_events: Mutex>, pending_peer_connected_events: Mutex>, pending_events_processor: AtomicBool, @@ -1233,6 +1234,8 @@ where entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH, ) -> Self { + let intercept_messages_for_offline_peers = false; + let queue_messages_for_offline_peers = false; Self::new_inner( entropy_source, node_signer, @@ -1243,7 +1246,8 @@ where async_payments_handler, dns_resolver, custom_handler, - false, + intercept_messages_for_offline_peers, + queue_messages_for_offline_peers, ) } @@ -1272,6 +1276,8 @@ where entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH, ) -> Self { + let intercept_messages_for_offline_peers = true; + let queue_messages_for_offline_peers = false; Self::new_inner( entropy_source, node_signer, @@ -1282,15 +1288,48 @@ where async_payments_handler, dns_resolver, custom_handler, - true, + intercept_messages_for_offline_peers, + queue_messages_for_offline_peers, + ) + } + + /// Similar to [`Self::new`], but rather than dropping onion messages that + /// are intended to be forwarded to offline peers, we'll queue them until + /// the peer connects or two timer ticks pass. + /// + /// A single [`Event::ConnectionNeeded`] event with just the NodeId and no + /// addresses will be generated once a message queues for an offline peer. + pub fn new_with_offline_peer_queueing( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, + offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH, + ) -> Self { + let intercept_messages_for_offline_peers = false; + let queue_messages_for_offline_peers = true; + Self::new_inner( + entropy_source, + node_signer, + logger, + node_id_lookup, + message_router, + offers_handler, + async_payments_handler, + dns_resolver, + custom_handler, + intercept_messages_for_offline_peers, + queue_messages_for_offline_peers, ) } fn new_inner( entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH, - intercept_messages_for_offline_peers: bool, + intercept_messages_for_offline_peers: bool, queue_messages_for_offline_peers: bool, ) -> Self { + debug_assert!( + !(intercept_messages_for_offline_peers && queue_messages_for_offline_peers), + "Can't generate intercept events and queue messages for offline peers at the same time", + ); + let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); OnionMessenger { @@ -1306,6 +1345,7 @@ where dns_resolver_handler: dns_resolver, custom_handler, intercept_messages_for_offline_peers, + queue_messages_for_offline_peers, pending_intercepted_msgs_events: Mutex::new(Vec::new()), pending_peer_connected_events: Mutex::new(Vec::new()), pending_events_processor: AtomicBool::new(false), @@ -2023,6 +2063,32 @@ where .entry(next_node_id) .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())); + // When enabled, we'll queue all messages, even for offline peers + // and peers pending connection. + if self.queue_messages_for_offline_peers { + match message_recipients.entry(next_node_id) { + hash_map::Entry::Occupied(mut e) => { + log_trace!(logger, "Forwarding onion message peer {next_node_id}"); + e.get_mut().enqueue_message(onion_message); + }, + hash_map::Entry::Vacant(e) => { + log_trace!( + logger, + "Forwarding onion message to disconnected peer {next_node_id}: \ + awaiting connection" + ); + let addrs = Vec::new(); + e.insert(OnionMessageRecipient::pending_connection(addrs)) + .enqueue_message(onion_message); + // Notify the background processor that we need to + // connect to this peer. + self.event_notifier.notify(); + }, + }; + return; + } + + // Otherwise, only forward to connected peers. match message_recipients.entry(next_node_id) { hash_map::Entry::Occupied(mut e) if matches!(e.get(), OnionMessageRecipient::ConnectedPeer(..)) =>