diff --git a/Cargo.toml b/Cargo.toml index 5cacf34..816ebf4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,9 +8,9 @@ description = "Types and primitives to integrate a spec-compliant LSP with an LD # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning.git", rev = "498f2331459d8031031ef151a44c90d700aa8c7e", features = ["max_level_trace", "std"] } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning.git", rev = "498f2331459d8031031ef151a44c90d700aa8c7e" } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning.git", rev = "498f2331459d8031031ef151a44c90d700aa8c7e" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning.git", rev = "a358ba2e68c84d19e70d1d45d3f677eaf324e09d", features = ["max_level_trace", "std"] } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning.git", rev = "a358ba2e68c84d19e70d1d45d3f677eaf324e09d" } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning.git", rev = "a358ba2e68c84d19e70d1d45d3f677eaf324e09d" } bitcoin = "0.29.0" diff --git a/src/events.rs b/src/events.rs index e69366f..fca3dd6 100644 --- a/src/events.rs +++ b/src/events.rs @@ -13,6 +13,7 @@ //! Because we don't have a built-in runtime, it's up to the end-user to poll //! [`crate::LiquidityManager::get_and_clear_pending_events()`] to receive events. +use crate::jit_channel; use std::collections::VecDeque; use std::sync::{Condvar, Mutex}; @@ -53,6 +54,9 @@ impl EventQueue { } } -/// Event which you should probably take some action in response to. +/// An Event which you should probably take some action in response to. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum Event {} +pub enum Event { + /// A LSPS2 (JIT Channel) protocol event + LSPS2(jit_channel::event::Event), +} diff --git a/src/jit_channel/channel_manager.rs b/src/jit_channel/channel_manager.rs new file mode 100644 index 0000000..bd7daf3 --- /dev/null +++ b/src/jit_channel/channel_manager.rs @@ -0,0 +1,841 @@ +// This file is Copyright its original authors, visible in version contror +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +use std::collections::HashMap; +use std::convert::TryInto; +use std::ops::Deref; +use std::sync::{Arc, Mutex, RwLock}; + +use bitcoin::secp256k1::PublicKey; +use lightning::ln::channelmanager::InterceptId; +use lightning::ln::msgs::{ + ChannelMessageHandler, ErrorAction, LightningError, OnionMessageHandler, RoutingMessageHandler, +}; +use lightning::ln::peer_handler::{ + APeerManager, CustomMessageHandler, PeerManager, SocketDescriptor, +}; +use lightning::routing::gossip::NetworkGraph; +use lightning::sign::{EntropySource, NodeSigner}; +use lightning::util::errors::APIError; +use lightning::util::logger::{Level, Logger}; + +use crate::events::EventQueue; +use crate::jit_channel::msgs::Message; +use crate::jit_channel::scid_utils; +use crate::transport::message_handler::ProtocolMessageHandler; +use crate::transport::msgs::{LSPSMessage, RequestId}; +use crate::utils; +use crate::{events::Event, transport::msgs::ResponseError}; + +use super::msgs::{ + BuyRequest, BuyResponse, GetInfoRequest, GetInfoResponse, GetVersionsRequest, + GetVersionsResponse, OpeningFeeParams, RawOpeningFeeParams, Request, Response, +}; + +const SUPPORTED_SPEC_VERSION: u16 = 1; + +#[derive(PartialEq)] +enum JITChannelState { + VersionsRequested, + MenuRequested, + PendingMenuSelection, + BuyRequested, + PendingPayment, + Ready, +} + +struct JITChannel { + id: u128, + user_id: u128, + state: JITChannelState, + fees: Option, + token: Option, + min_payment_size_msat: Option, + max_payment_size_msat: Option, + payment_size_msat: Option, + counterparty_node_id: PublicKey, + amt_to_forward_msat: Option, + intercept_id: Option, + scid: Option, + lsp_cltv_expiry_delta: Option, +} + +impl JITChannel { + pub fn new( + id: u128, counterparty_node_id: PublicKey, user_id: u128, payment_size_msat: Option, + token: Option, + ) -> Self { + Self { + id, + counterparty_node_id, + token, + user_id, + state: JITChannelState::VersionsRequested, + fees: None, + min_payment_size_msat: None, + max_payment_size_msat: None, + payment_size_msat, + scid: None, + amt_to_forward_msat: None, + lsp_cltv_expiry_delta: None, + intercept_id: None, + } + } +} + +#[derive(Default)] +struct PeerState { + channels_by_id: HashMap, + request_to_cid: HashMap, + pending_requests: HashMap, +} + +impl PeerState { + pub fn insert_channel(&mut self, channel_id: u128, channel: JITChannel) { + self.channels_by_id.insert(channel_id, channel); + } + + pub fn insert_request(&mut self, request_id: RequestId, channel_id: u128) { + self.request_to_cid.insert(request_id, channel_id); + } + + pub fn get_channel_in_state_for_request( + &mut self, request_id: &RequestId, state: JITChannelState, + ) -> Option<&mut JITChannel> { + let channel_id = self.request_to_cid.remove(request_id)?; + + if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { + if channel.state == state { + return Some(channel); + } + } + None + } + + pub fn remove_channel(&mut self, channel_id: u128) { + self.channels_by_id.remove(&channel_id); + } +} + +pub struct JITChannelManager< + ES: Deref, + Descriptor: SocketDescriptor + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + RM: Deref + Send + Sync + 'static, + CM: Deref + Send + Sync + 'static, + OM: Deref + Send + Sync + 'static, + CMH: Deref + Send + Sync + 'static, + NS: Deref + Send + Sync + 'static, +> where + ES::Target: EntropySource, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + entropy_source: ES, + peer_manager: Mutex>>>, + pending_messages: Arc>>, + pending_events: Arc, + per_peer_state: RwLock>>, + channels_by_scid: RwLock>, + promise_secret: [u8; 32], +} + +impl< + ES: Deref, + Descriptor: SocketDescriptor + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + RM: Deref + Send + Sync + 'static, + CM: Deref + Send + Sync + 'static, + OM: Deref + Send + Sync + 'static, + CMH: Deref + Send + Sync + 'static, + NS: Deref + Send + Sync + 'static, + > JITChannelManager +where + ES::Target: EntropySource, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + pub(crate) fn new( + entropy_source: ES, promise_secret: [u8; 32], + pending_messages: Arc>>, + pending_events: Arc, + ) -> Self { + Self { + entropy_source, + promise_secret, + pending_messages, + pending_events, + per_peer_state: RwLock::new(HashMap::new()), + channels_by_scid: RwLock::new(HashMap::new()), + peer_manager: Mutex::new(None), + } + } + + pub fn set_peer_manager( + &self, peer_manager: Arc>, + ) { + *self.peer_manager.lock().unwrap() = Some(peer_manager); + } + + pub fn create_invoice( + &self, counterparty_node_id: PublicKey, payment_size_msat: Option, + token: Option, user_channel_id: u128, + ) { + let channel_id = self.generate_channel_id(); + let channel = JITChannel::new( + channel_id, + counterparty_node_id, + user_channel_id, + payment_size_msat, + token, + ); + + let mut per_peer_state = self.per_peer_state.write().unwrap(); + let peer_state_mutex = + per_peer_state.entry(counterparty_node_id).or_insert(Mutex::new(PeerState::default())); + let peer_state = peer_state_mutex.get_mut().unwrap(); + peer_state.insert_channel(channel_id, channel); + + let request_id = self.generate_request_id(); + peer_state.insert_request(request_id.clone(), channel_id); + + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(( + counterparty_node_id, + Message::Request(request_id, Request::GetVersions(GetVersionsRequest {})).into(), + )); + } + + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); + } + } + + pub fn opening_fee_params_generated( + &self, counterparty_node_id: PublicKey, request_id: RequestId, + opening_fee_params_menu: Vec, min_payment_size_msat: u64, + max_payment_size_msat: u64, + ) -> Result<(), APIError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + + match per_peer_state.get(&counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state.pending_requests.remove(&request_id) { + Some(Request::GetInfo(_)) => { + let response = Response::GetInfo(GetInfoResponse { + opening_fee_params_menu: opening_fee_params_menu + .into_iter() + .map(|param| param.into_opening_fee_params(&self.promise_secret)) + .collect(), + min_payment_size_msat, + max_payment_size_msat, + }); + self.enqueue_response(counterparty_node_id, request_id, response); + Ok(()) + } + _ => Err(APIError::APIMisuseError { + err: format!( + "No pending get_info request for request_id: {:?}", + request_id + ), + }), + } + } + None => Err(APIError::APIMisuseError { + err: format!("No state for the counterparty exists: {:?}", counterparty_node_id), + }), + } + } + + pub fn opening_fee_params_selected( + &self, counterparty_node_id: PublicKey, channel_id: u128, + opening_fee_params: OpeningFeeParams, + ) -> Result<(), APIError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(&counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + if let Some(channel) = peer_state.channels_by_id.get_mut(&channel_id) { + if channel.state == JITChannelState::PendingMenuSelection { + channel.state = JITChannelState::BuyRequested; + channel.fees = Some(opening_fee_params.clone()); + + let request_id = self.generate_request_id(); + let payment_size_msat = channel.payment_size_msat; + peer_state.insert_request(request_id.clone(), channel_id); + + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(( + counterparty_node_id, + Message::Request( + request_id, + Request::Buy(BuyRequest { + version: SUPPORTED_SPEC_VERSION, + opening_fee_params, + payment_size_msat, + }), + ) + .into(), + )); + } + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); + } + } else { + return Err(APIError::APIMisuseError { + err: "Channel is not pending menu selection".to_string(), + }); + } + } else { + return Err(APIError::APIMisuseError { + err: format!("Channel with id {} not found", channel_id), + }); + } + } + None => { + return Err(APIError::APIMisuseError { + err: format!("No existing state with counterparty {}", counterparty_node_id), + }) + } + } + + Ok(()) + } + + pub fn invoice_parameters_generated( + &self, counterparty_node_id: PublicKey, request_id: RequestId, scid: u64, + cltv_expiry_delta: u32, client_trusts_lsp: bool, + ) -> Result<(), APIError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + + match per_peer_state.get(&counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state.pending_requests.remove(&request_id) { + Some(Request::Buy(buy_request)) => { + let mut channels_by_scid = self.channels_by_scid.write().unwrap(); + channels_by_scid.insert( + scid, + JITChannel { + id: 0, + user_id: 0, + state: JITChannelState::BuyRequested, + fees: Some(buy_request.opening_fee_params), + token: None, + min_payment_size_msat: None, + max_payment_size_msat: None, + payment_size_msat: buy_request.payment_size_msat, + counterparty_node_id, + scid: Some(scid), + lsp_cltv_expiry_delta: Some(cltv_expiry_delta), + amt_to_forward_msat: None, + intercept_id: None, + }, + ); + + let block = scid_utils::block_from_scid(&scid); + let tx_index = scid_utils::tx_index_from_scid(&scid); + let vout = scid_utils::vout_from_scid(&scid); + + let jit_channel_scid = format!("{}x{}x{}", block, tx_index, vout); + + self.enqueue_response( + counterparty_node_id, + request_id, + Response::Buy(BuyResponse { + jit_channel_scid, + lsp_cltv_expiry_delta: cltv_expiry_delta, + client_trusts_lsp, + }), + ); + + Ok(()) + } + _ => Err(APIError::APIMisuseError { + err: format!("No pending buy request for request_id: {:?}", request_id), + }), + } + } + None => Err(APIError::APIMisuseError { + err: format!("No state for the counterparty exists: {:?}", counterparty_node_id), + }), + } + } + + // need to decide if we should ignore, enqueue OpenChannel event, or enqueue FailInterceptedHTLC event + pub(crate) fn htlc_intercepted( + &self, scid: u64, intercept_id: InterceptId, inbound_amount_msat: u64, + expected_outbound_amount_msat: u64, + ) { + let mut channels_by_scid = self.channels_by_scid.write().unwrap(); + + if let Some(channel) = channels_by_scid.get_mut(&scid) { + if let Some(fees) = &channel.fees { + let opening_fee_msat = utils::compute_opening_fee( + expected_outbound_amount_msat, + fees.min_fee_msat, + fees.proportional as u64, + ); + + if let Some(opening_fee_msat) = opening_fee_msat { + let amt_to_forward_msat = expected_outbound_amount_msat - opening_fee_msat; + channel.amt_to_forward_msat = Some(amt_to_forward_msat); + channel.intercept_id = Some(intercept_id); + + self.enqueue_event(Event::LSPS2(crate::JITChannelEvent::OpenChannel { + their_network_key: channel.counterparty_node_id, + inbound_amount_msat, + expected_outbound_amount_msat, + amt_to_forward_msat, + opening_fee_msat, + user_channel_id: scid as u128, + })); + } else { + self.enqueue_event(Event::LSPS2(crate::JITChannelEvent::FailInterceptedHTLC { + intercept_id, + })); + } + } else { + self.enqueue_event(Event::LSPS2(crate::JITChannelEvent::FailInterceptedHTLC { + intercept_id, + })); + } + } + } + + // figure out which intercept id is waiting on this channel and enqueue ForwardInterceptedHTLC event + pub(crate) fn channel_ready( + &self, user_channel_id: u128, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, + ) { + println!("channel_ready received"); + + let channels_by_scid = self.channels_by_scid.read().unwrap(); + + if let Ok(scid) = user_channel_id.try_into() { + println!("user_channel_id might be scid: {:?}", scid); + + if let Some(channel) = channels_by_scid.get(&scid) { + println!("found channel to forward payment over"); + self.enqueue_event(Event::LSPS2(crate::JITChannelEvent::ForwardInterceptedHTLC { + intercept_id: channel.intercept_id.unwrap(), + next_hop_channel_id: *channel_id, + next_node_id: *counterparty_node_id, + amt_to_forward_msat: channel.amt_to_forward_msat.unwrap(), + })); + } else { + println!("could not find a channel with that scid"); + } + } else { + println!("could not parse user_channel_id into u64 scid"); + } + } + + fn generate_channel_id(&self) -> u128 { + let bytes = self.entropy_source.get_secure_random_bytes(); + let mut id_bytes: [u8; 16] = [0; 16]; + id_bytes.copy_from_slice(&bytes[0..16]); + u128::from_be_bytes(id_bytes) + } + + fn generate_request_id(&self) -> RequestId { + let bytes = self.entropy_source.get_secure_random_bytes(); + RequestId(utils::hex_str(&bytes[0..16])) + } + + fn enqueue_response( + &self, counterparty_node_id: PublicKey, request_id: RequestId, response: Response, + ) { + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages + .push((counterparty_node_id, Message::Response(request_id, response).into())); + } + + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); + } + } + + fn enqueue_event(&self, event: Event) { + self.pending_events.enqueue(event); + } + + fn handle_get_versions_request( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, + ) -> Result<(), LightningError> { + // not sure best way to extract a vec to a constant? lazy_static? + self.enqueue_response( + *counterparty_node_id, + request_id, + Response::GetVersions(GetVersionsResponse { versions: vec![1] }), + ); + Ok(()) + } + + fn handle_get_versions_response( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetVersionsResponse, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state.get_channel_in_state_for_request( + &request_id, + JITChannelState::VersionsRequested, + ) { + Some(channel) => { + let channel_id = channel.id; + let token = channel.token.clone(); + + if result.versions.contains(&SUPPORTED_SPEC_VERSION) { + channel.state = JITChannelState::MenuRequested; + + let request_id = self.generate_request_id(); + peer_state.insert_request(request_id.clone(), channel_id); + + { + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(( + *counterparty_node_id, + Message::Request( + request_id, + Request::GetInfo(GetInfoRequest { + version: SUPPORTED_SPEC_VERSION, + token, + }), + ) + .into(), + )); + } + + if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { + peer_manager.process_events(); + } + } else { + peer_state.remove_channel(channel_id); + } + } + None => { + return Err(LightningError { + err: format!( + "Received get_versions response without a matching channel: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }) + } + } + } + None => { + return Err(LightningError { + err: format!( + "Received get_versions response from unknown peer: {:?}", + counterparty_node_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }) + } + } + + Ok(()) + } + + fn handle_get_info_request( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, params: GetInfoRequest, + ) -> Result<(), LightningError> { + let mut per_peer_state = self.per_peer_state.write().unwrap(); + let peer_state_mutex: &mut Mutex = + per_peer_state.entry(*counterparty_node_id).or_insert(Mutex::new(PeerState::default())); + let peer_state = peer_state_mutex.get_mut().unwrap(); + peer_state.pending_requests.insert(request_id.clone(), Request::GetInfo(params.clone())); + + self.enqueue_event(Event::LSPS2(super::event::Event::GetInfo { + request_id, + counterparty_node_id: *counterparty_node_id, + version: params.version, + token: params.token, + })); + Ok(()) + } + + fn handle_get_info_response( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state + .get_channel_in_state_for_request(&request_id, JITChannelState::MenuRequested) + { + Some(channel) => { + channel.state = JITChannelState::PendingMenuSelection; + channel.min_payment_size_msat = Some(result.min_payment_size_msat); + channel.max_payment_size_msat = Some(result.max_payment_size_msat); + + self.enqueue_event(Event::LSPS2(super::event::Event::GetInfoResponse { + counterparty_node_id: *counterparty_node_id, + opening_fee_params_menu: result.opening_fee_params_menu, + min_payment_size_msat: result.min_payment_size_msat, + max_payment_size_msat: result.max_payment_size_msat, + channel_id: channel.id, + user_channel_id: channel.user_id, + })); + } + None => { + return Err(LightningError { + err: format!( + "Received get_info response without a matching channel: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }) + } + } + } + None => { + return Err(LightningError { + err: format!( + "Received get_info response from unknown peer: {:?}", + counterparty_node_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }) + } + } + + Ok(()) + } + + fn handle_get_info_error( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state + .get_channel_in_state_for_request(&request_id, JITChannelState::BuyRequested) + { + Some(channel) => { + let channel_id = channel.id; + peer_state.remove_channel(channel_id); + return Err(LightningError { + err: format!("Received error response from getinfo request ({:?}) with counterparty {:?}. Removing channel {}. code = {}, message = {}", request_id, counterparty_node_id, channel_id, error.code, error.message), + action: ErrorAction::IgnoreAndLog(Level::Info) + }); + } + None => { + return Err(LightningError { + err: format!("Received an unexpected error response for a getinfo request from counterparty ({:?})", counterparty_node_id), + action: ErrorAction::IgnoreAndLog(Level::Info) + }); + } + } + } + None => { + return Err(LightningError { + err: format!("Received error response for a getinfo request from an unknown counterparty ({:?})", counterparty_node_id), + action: ErrorAction::IgnoreAndLog(Level::Info) + }); + } + } + } + + fn handle_buy_request( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, params: BuyRequest, + ) -> Result<(), LightningError> { + if params.opening_fee_params.is_valid(&self.promise_secret) { + let mut per_peer_state = self.per_peer_state.write().unwrap(); + let peer_state_mutex = per_peer_state + .entry(*counterparty_node_id) + .or_insert(Mutex::new(PeerState::default())); + let peer_state = peer_state_mutex.get_mut().unwrap(); + peer_state.pending_requests.insert(request_id.clone(), Request::Buy(params.clone())); + + self.enqueue_event(Event::LSPS2(super::event::Event::BuyRequest { + request_id, + version: params.version, + counterparty_node_id: *counterparty_node_id, + opening_fee_params: params.opening_fee_params, + payment_size_msat: params.payment_size_msat, + })); + } + Ok(()) + } + + fn handle_buy_response( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: BuyResponse, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state + .get_channel_in_state_for_request(&request_id, JITChannelState::BuyRequested) + { + Some(channel) => { + channel.state = JITChannelState::PendingPayment; + channel.lsp_cltv_expiry_delta = Some(result.lsp_cltv_expiry_delta); + + if let Ok(scid) = + scid_utils::scid_from_human_readable_string(&result.jit_channel_scid) + { + channel.scid = Some(scid); + + self.enqueue_event(Event::LSPS2( + super::event::Event::InvoiceGenerationReady { + counterparty_node_id: *counterparty_node_id, + scid, + cltv_expiry_delta: result.lsp_cltv_expiry_delta, + min_payment_size_msat: channel.min_payment_size_msat, + max_payment_size_msat: channel.max_payment_size_msat, + payment_size_msat: channel.payment_size_msat, + fees: channel.fees.clone().unwrap(), + client_trusts_lsp: result.client_trusts_lsp, + user_channel_id: channel.user_id, + }, + )); + } else { + return Err(LightningError { + err: format!( + "Received buy response with an invalid scid {}", + result.jit_channel_scid + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }); + } + } + None => { + return Err(LightningError { + err: format!( + "Received buy response without a matching channel: {:?}", + request_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }); + } + } + } + None => { + return Err(LightningError { + err: format!( + "Received buy response from unknown peer: {:?}", + counterparty_node_id + ), + action: ErrorAction::IgnoreAndLog(Level::Info), + }); + } + } + Ok(()) + } + + fn handle_buy_error( + &self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError, + ) -> Result<(), LightningError> { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + + match peer_state + .get_channel_in_state_for_request(&request_id, JITChannelState::BuyRequested) + { + Some(channel) => { + let channel_id = channel.id; + peer_state.remove_channel(channel_id); + return Err(LightningError { err: format!( "Received error response from buy request ({:?}) with counterparty {:?}. Removing channel {}. code = {}, message = {}", request_id, counterparty_node_id, channel_id, error.code, error.message), action: ErrorAction::IgnoreAndLog(Level::Info)}); + } + None => { + return Err(LightningError { err: format!("Received an unexpected error response for a buy request from counterparty ({:?})", counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)}); + } + } + } + None => { + return Err(LightningError { err: format!("Received error response for a buy request from an unknown counterparty ({:?})",counterparty_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)}); + } + } + } +} + +impl< + ES: Deref, + Descriptor: SocketDescriptor + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + RM: Deref + Send + Sync + 'static, + CM: Deref + Send + Sync + 'static, + OM: Deref + Send + Sync + 'static, + CMH: Deref + Send + Sync + 'static, + NS: Deref + Send + Sync + 'static, + > ProtocolMessageHandler for JITChannelManager +where + ES::Target: EntropySource, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + type ProtocolMessage = Message; + const PROTOCOL_NUMBER: Option = Some(2); + + fn handle_message( + &self, message: Self::ProtocolMessage, counterparty_node_id: &PublicKey, + ) -> Result<(), LightningError> { + match message { + Message::Request(request_id, request) => match request { + super::msgs::Request::GetVersions(_) => { + self.handle_get_versions_request(request_id, counterparty_node_id) + } + super::msgs::Request::GetInfo(params) => { + self.handle_get_info_request(request_id, counterparty_node_id, params) + } + super::msgs::Request::Buy(params) => { + self.handle_buy_request(request_id, counterparty_node_id, params) + } + }, + Message::Response(request_id, response) => match response { + super::msgs::Response::GetVersions(result) => { + self.handle_get_versions_response(request_id, counterparty_node_id, result) + } + super::msgs::Response::GetInfo(result) => { + self.handle_get_info_response(request_id, counterparty_node_id, result) + } + super::msgs::Response::GetInfoError(error) => { + self.handle_get_info_error(request_id, counterparty_node_id, error) + } + super::msgs::Response::Buy(result) => { + self.handle_buy_response(request_id, counterparty_node_id, result) + } + super::msgs::Response::BuyError(error) => { + self.handle_buy_error(request_id, counterparty_node_id, error) + } + }, + } + } +} diff --git a/src/jit_channel/event.rs b/src/jit_channel/event.rs new file mode 100644 index 0000000..4498a61 --- /dev/null +++ b/src/jit_channel/event.rs @@ -0,0 +1,127 @@ +// This file is Copyright its original authors, visible in version contror +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +use bitcoin::secp256k1::PublicKey; +use lightning::ln::channelmanager::InterceptId; + +use super::msgs::OpeningFeeParams; +use crate::transport::msgs::RequestId; + +/// An Event which you should probably take some action in response to. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Event { + /// A request from a client for information about JIT Channel parameters. + /// + /// You must calculate the paramaeters for this client and pass them to + /// [`crate::LiquidityManager::opening_fee_params_generated`]. + GetInfo { + /// An identifier that must be passed to [`crate::LiquidityManager::opening_fee_params_generated`]. + request_id: RequestId, + /// The node id of the client making the information request. + counterparty_node_id: PublicKey, + /// The protocol version they would like to use. + version: u16, + /// An optional token that can be used as an api key, coupon code, etc. + token: Option, + }, + /// Information from LSP about their current fee and channel parameters. + /// + /// You must call [`crate::LiquidityManager::opening_fee_params_selected`] with the fee parameter + /// you want to use if you wish to proceed opening a channel. + GetInfoResponse { + /// Needs to be passed to [`crate::LiquidityManager::opening_fee_params_selected`]. + channel_id: u128, + /// The node id of the LSP that provided this response. + counterparty_node_id: PublicKey, + /// The menu of fee parameters the LSP is offering at this time. + /// You must select one of these if you wish to proceed. + opening_fee_params_menu: Vec, + /// The min payment size allowed when opening the channel. + min_payment_size_msat: u64, + /// The max payment size allowed when opening the channel. + max_payment_size_msat: u64, + /// The user_channel_id value passed in to [`crate::LiquidityManager::create_invoice`]. + user_channel_id: u128, + }, + /// A client has selected a opening fee parameter to use and would like to + /// purchase a channel with an optional initial payment size. + /// + /// If payment_size_msat is [`Option::Some`] then the payer is allowed to use MPP + /// If payment_size_msat is [`Option::None`] then the payer cannot use MPP + /// + /// You must generate an scid and cltv_expiry_delta for them to use + /// and call [`crate::LiquidityManager::invoice_parameters_generated`]. + BuyRequest { + /// An identifier that must be passed into [`crate::LiquidityManager::invoice_parameters_generated`]. + request_id: RequestId, + /// The client node id that is making this request. + counterparty_node_id: PublicKey, + /// The version of the protocol they would like to use. + version: u16, + /// The channel parameters they have selected. + opening_fee_params: OpeningFeeParams, + /// The size of the initial payment they would like to receive. + payment_size_msat: Option, + }, + /// Use the provided fields to generate an invoice and give to payer. + /// + /// When the invoice is paid the LSP will open a channel to you + /// with the previously agreed upon parameters. + InvoiceGenerationReady { + /// The node id of the LSP. + counterparty_node_id: PublicKey, + /// The short channel id to use in the route hint. + scid: u64, + /// The cltv_expiry_delta to use in the route hint. + cltv_expiry_delta: u32, + /// The agreed upon channel parameters. + fees: OpeningFeeParams, + /// The min payment size allowed. + min_payment_size_msat: Option, + /// The max payment size allowed. + max_payment_size_msat: Option, + /// The initial payment size you specified. + payment_size_msat: Option, + /// The trust model the lsp expects. + client_trusts_lsp: bool, + /// The user_channel_id value passed in to [`crate::LiquidityManager::create_invoice`]. + user_channel_id: u128, + }, + /// You should call [`lightning::ln::channelmanager::ChannelManager::fail_intercepted_htlc`]. + FailInterceptedHTLC { + /// The identifier LDK uses to uniquely identify an intercepted HTLC. + intercept_id: InterceptId, + }, + /// You should call [`lightning::ln::channelmanager::ChannelManager::forward_intercepted_htlc`]. + ForwardInterceptedHTLC { + /// The identifier LDK uses to uniquely identify an intercepted HTLC. + intercept_id: InterceptId, + /// The channel to forward the HTLC over. + next_hop_channel_id: [u8; 32], + /// The node that should receive the HTLC. + next_node_id: PublicKey, + /// How many msats the HTLC should be for. + amt_to_forward_msat: u64, + }, + /// You should open a channel using [`lightning::ln::channelmanager::ChannelManager::create_channel`]. + OpenChannel { + /// The node to open channel with + their_network_key: PublicKey, + /// The intercepted htlc amount in msats + inbound_amount_msat: u64, + /// The amount the client expects to receive before fees are taken out + expected_outbound_amount_msat: u64, + /// The amount to forward after fees + amt_to_forward_msat: u64, + /// The fee earned for opening the channel + opening_fee_msat: u64, + /// An internal id used to track channel open + user_channel_id: u128, + }, +} diff --git a/src/jit_channel/mod.rs b/src/jit_channel/mod.rs index 0d948d9..6f871ea 100644 --- a/src/jit_channel/mod.rs +++ b/src/jit_channel/mod.rs @@ -7,4 +7,8 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! Types and primitives that implement the LSPS3: JIT Channel Negotiation specification. +//! Types and primitives that implement the LSPS2: JIT Channel Negotiation specification. +pub mod channel_manager; +pub mod event; +pub mod msgs; +pub mod scid_utils; diff --git a/src/jit_channel/msgs.rs b/src/jit_channel/msgs.rs new file mode 100644 index 0000000..aadae36 --- /dev/null +++ b/src/jit_channel/msgs.rs @@ -0,0 +1,242 @@ +use std::convert::TryFrom; + +use bitcoin::hashes::hmac::{Hmac, HmacEngine}; +use bitcoin::hashes::sha256::Hash as Sha256; +use bitcoin::hashes::{Hash, HashEngine}; +use serde::{Deserialize, Serialize}; + +use crate::transport::msgs::{LSPSMessage, RequestId, ResponseError}; +use crate::utils; + +pub(crate) const LSPS2_GETVERSIONS_METHOD_NAME: &str = "lsps2.getversions"; +pub(crate) const LSPS2_GETINFO_METHOD_NAME: &str = "lsps2.getinfo"; +pub(crate) const LSPS2_BUY_METHOD_NAME: &str = "lsps2.buy"; + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, Default)] +#[serde(default)] +pub struct GetVersionsRequest {} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct GetVersionsResponse { + pub versions: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct GetInfoRequest { + pub version: u16, + pub token: Option, +} + +/// Fees and parameters for a JIT Channel. +/// +/// The client will pay max(min_fee_msat, proportional*(payment_size_msat/1_000_000)). +pub struct RawOpeningFeeParams { + /// The minimum fee required for the channel open. + pub min_fee_msat: u64, + /// A fee proportional to the size of the initial payment. + pub proportional: u32, + /// An ISO8601 formatted date for which these params are valid. + pub valid_until: String, + /// number of blocks that the LSP promises it will keep the channel alive without closing, after confirmation. + pub min_lifetime: u32, + /// Maximum number of blocks that the client is allowed to set its to_self_delay parameter. + pub max_client_to_self_delay: u32, +} + +impl RawOpeningFeeParams { + pub(crate) fn into_opening_fee_params(self, promise_secret: &[u8; 32]) -> OpeningFeeParams { + let mut hmac = HmacEngine::::new(promise_secret); + hmac.input(&self.min_fee_msat.to_be_bytes()); + hmac.input(&self.proportional.to_be_bytes()); + hmac.input(self.valid_until.as_bytes()); + hmac.input(&self.min_lifetime.to_be_bytes()); + hmac.input(&self.max_client_to_self_delay.to_be_bytes()); + let promise_bytes = Hmac::from_engine(hmac).into_inner(); + let promise = utils::hex_str(&promise_bytes[..]); + OpeningFeeParams { + min_fee_msat: self.min_fee_msat, + proportional: self.proportional, + valid_until: self.valid_until.clone(), + min_lifetime: self.min_lifetime, + max_client_to_self_delay: self.max_client_to_self_delay, + promise, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct OpeningFeeParams { + pub min_fee_msat: u64, + pub proportional: u32, + pub valid_until: String, + pub min_lifetime: u32, + pub max_client_to_self_delay: u32, + pub promise: String, +} + +impl OpeningFeeParams { + pub fn is_valid(&self, promise_secret: &[u8; 32]) -> bool { + let mut hmac = HmacEngine::::new(promise_secret); + hmac.input(&self.min_fee_msat.to_be_bytes()); + hmac.input(&self.proportional.to_be_bytes()); + hmac.input(self.valid_until.as_bytes()); + hmac.input(&self.min_lifetime.to_be_bytes()); + hmac.input(&self.max_client_to_self_delay.to_be_bytes()); + let promise_bytes = Hmac::from_engine(hmac).into_inner(); + let promise = utils::hex_str(&promise_bytes[..]); + promise == self.promise + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct GetInfoResponse { + pub opening_fee_params_menu: Vec, + pub min_payment_size_msat: u64, + pub max_payment_size_msat: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct BuyRequest { + pub version: u16, + pub opening_fee_params: OpeningFeeParams, + #[serde(skip_serializing_if = "Option::is_none")] + pub payment_size_msat: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct BuyResponse { + pub jit_channel_scid: String, + pub lsp_cltv_expiry_delta: u32, + #[serde(default)] + pub client_trusts_lsp: bool, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Request { + GetVersions(GetVersionsRequest), + GetInfo(GetInfoRequest), + Buy(BuyRequest), +} + +impl Request { + pub fn method(&self) -> &str { + match self { + Request::GetVersions(_) => LSPS2_GETVERSIONS_METHOD_NAME, + Request::GetInfo(_) => LSPS2_GETINFO_METHOD_NAME, + Request::Buy(_) => LSPS2_BUY_METHOD_NAME, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Response { + GetVersions(GetVersionsResponse), + GetInfo(GetInfoResponse), + GetInfoError(ResponseError), + Buy(BuyResponse), + BuyError(ResponseError), +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Message { + Request(RequestId, Request), + Response(RequestId, Response), +} + +impl TryFrom for Message { + type Error = (); + + fn try_from(message: LSPSMessage) -> Result { + if let LSPSMessage::LSPS2(message) = message { + return Ok(message); + } + + Err(()) + } +} + +impl From for LSPSMessage { + fn from(message: Message) -> Self { + LSPSMessage::LSPS2(message) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn into_opening_fee_params_produces_valid_promise() { + let min_fee_msat = 100; + let proportional = 21; + let valid_until = "2023-05-20".to_string(); + let min_lifetime = 144; + let max_client_to_self_delay = 128; + + let raw = RawOpeningFeeParams { + min_fee_msat, + proportional, + valid_until: valid_until.clone(), + min_lifetime, + max_client_to_self_delay, + }; + + let promise_secret = [1u8; 32]; + + let opening_fee_params = raw.into_opening_fee_params(&promise_secret); + + assert_eq!(opening_fee_params.min_fee_msat, min_fee_msat); + assert_eq!(opening_fee_params.proportional, proportional); + assert_eq!(opening_fee_params.valid_until, valid_until); + assert_eq!(opening_fee_params.min_lifetime, min_lifetime); + assert_eq!(opening_fee_params.max_client_to_self_delay, max_client_to_self_delay); + + assert!(opening_fee_params.is_valid(&promise_secret)); + } + + #[test] + fn changing_single_field_produced_invalid_params() { + let min_fee_msat = 100; + let proportional = 21; + let valid_until = "2023-05-20".to_string(); + let min_lifetime = 144; + let max_client_to_self_delay = 128; + + let raw = RawOpeningFeeParams { + min_fee_msat, + proportional, + valid_until, + min_lifetime, + max_client_to_self_delay, + }; + + let promise_secret = [1u8; 32]; + + let mut opening_fee_params = raw.into_opening_fee_params(&promise_secret); + opening_fee_params.min_fee_msat = min_fee_msat + 1; + assert!(!opening_fee_params.is_valid(&promise_secret)); + } + + #[test] + fn wrong_secret_produced_invalid_params() { + let min_fee_msat = 100; + let proportional = 21; + let valid_until = "2023-05-20".to_string(); + let min_lifetime = 144; + let max_client_to_self_delay = 128; + + let raw = RawOpeningFeeParams { + min_fee_msat, + proportional, + valid_until, + min_lifetime, + max_client_to_self_delay, + }; + + let promise_secret = [1u8; 32]; + let other_secret = [2u8; 32]; + + let opening_fee_params = raw.into_opening_fee_params(&promise_secret); + assert!(!opening_fee_params.is_valid(&other_secret)); + } +} diff --git a/src/jit_channel/scid_utils.rs b/src/jit_channel/scid_utils.rs new file mode 100644 index 0000000..ce83457 --- /dev/null +++ b/src/jit_channel/scid_utils.rs @@ -0,0 +1,71 @@ +#[derive(Debug, PartialEq, Eq)] +pub enum ShortChannelIdError { + InvalidScid, +} + +/// Maximum transaction index that can be used in a `short_channel_id`. +/// This value is based on the 3-bytes available for tx index. +pub const MAX_SCID_TX_INDEX: u64 = 0x00ffffff; + +/// Maximum vout index that can be used in a `short_channel_id`. This +/// value is based on the 2-bytes available for the vout index. +pub const MAX_SCID_VOUT_INDEX: u64 = 0xffff; + +/// Extracts the block height (most significant 3-bytes) from the `short_channel_id` +pub fn block_from_scid(short_channel_id: &u64) -> u32 { + (short_channel_id >> 40) as u32 +} + +/// Extracts the tx index (bytes [2..4]) from the `short_channel_id` +pub fn tx_index_from_scid(short_channel_id: &u64) -> u32 { + ((short_channel_id >> 16) & MAX_SCID_TX_INDEX) as u32 +} + +/// Extracts the vout (bytes [0..2]) from the `short_channel_id` +pub fn vout_from_scid(short_channel_id: &u64) -> u16 { + ((short_channel_id) & MAX_SCID_VOUT_INDEX) as u16 +} + +pub fn scid_from_human_readable_string( + human_readable_scid: &str, +) -> Result { + let mut parts = human_readable_scid.split('x'); + + let block: u64 = parts + .next() + .ok_or(ShortChannelIdError::InvalidScid)? + .parse() + .map_err(|_e| ShortChannelIdError::InvalidScid)?; + let tx_index: u64 = parts + .next() + .ok_or(ShortChannelIdError::InvalidScid)? + .parse() + .map_err(|_e| ShortChannelIdError::InvalidScid)?; + let vout_index: u64 = parts + .next() + .ok_or(ShortChannelIdError::InvalidScid)? + .parse() + .map_err(|_e| ShortChannelIdError::InvalidScid)?; + + Ok((block << 40) | (tx_index << 16) | vout_index) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_human_readable_scid_correctly() { + let block = 140; + let tx_index = 123; + let vout = 22; + + let human_readable_scid = format!("{}x{}x{}", block, tx_index, vout); + + let scid = scid_from_human_readable_string(&human_readable_scid).unwrap(); + + assert_eq!(block_from_scid(&scid), block); + assert_eq!(tx_index_from_scid(&scid), tx_index); + assert_eq!(vout_from_scid(&scid), vout); + } +} diff --git a/src/lib.rs b/src/lib.rs index 4919017..0acf9fd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,4 +24,8 @@ mod jit_channel; mod transport; mod utils; -pub use transport::message_handler::{LiquidityManager, LiquidityProviderConfig}; +pub use jit_channel::event::Event as JITChannelEvent; +pub use jit_channel::msgs::RawOpeningFeeParams; +pub use transport::message_handler::{ + JITChannelsConfig, LiquidityManager, LiquidityProviderConfig, +}; diff --git a/src/transport/message_handler.rs b/src/transport/message_handler.rs index 20681fe..cd82166 100644 --- a/src/transport/message_handler.rs +++ b/src/transport/message_handler.rs @@ -1,14 +1,20 @@ use crate::events::{Event, EventQueue}; +use crate::jit_channel::channel_manager::JITChannelManager; +use crate::jit_channel::msgs::{OpeningFeeParams, RawOpeningFeeParams}; use crate::transport::msgs::{LSPSMessage, RawLSPSMessage, LSPS_MESSAGE_TYPE}; use crate::transport::protocol::LSPS0MessageHandler; use bitcoin::secp256k1::PublicKey; +use lightning::ln::channelmanager::InterceptId; use lightning::ln::features::{InitFeatures, NodeFeatures}; -use lightning::ln::msgs::{ErrorAction, LightningError}; -use lightning::ln::peer_handler::CustomMessageHandler; +use lightning::ln::msgs::{ + ChannelMessageHandler, ErrorAction, LightningError, OnionMessageHandler, RoutingMessageHandler, +}; +use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::ln::wire::CustomMessageReader; -use lightning::sign::EntropySource; -use lightning::util::logger::Level; +use lightning::sign::{EntropySource, NodeSigner}; +use lightning::util::errors::APIError; +use lightning::util::logger::{Level, Logger}; use lightning::util::ser::Readable; use std::collections::HashMap; use std::convert::TryFrom; @@ -16,6 +22,8 @@ use std::io; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use super::msgs::RequestId; + const LSPS_FEATURE_BIT: usize = 729; /// A trait used to implement a specific LSPS protocol. @@ -35,42 +43,104 @@ pub(crate) trait ProtocolMessageHandler { /// /// Allows end-user to configure options when using the [`LiquidityManager`] /// to provide liquidity services to clients. -pub struct LiquidityProviderConfig; +pub struct LiquidityProviderConfig { + /// Optional configuration for jit channels + /// should you want to support them + pub jit_channels: Option, +} + +/// Configuration options for jit channels +/// A configuration used for the creation of Just In Time Channels. +pub struct JITChannelsConfig { + /// Used to calculate the promise for channel parameters supplied to clients + /// + /// Note: If this changes then old promises given out will be considered invalid + pub promise_secret: [u8; 32], +} /// The main interface into LSP functionality. /// /// Should be used as a [`CustomMessageHandler`] for your /// [`lightning::ln::peer_handler::PeerManager`]'s [`lightning::ln::peer_handler::MessageHandler`]. -pub struct LiquidityManager -where +/// +/// Should provide a reference to your [`lightning::ln::peer_handler::PeerManager`] by calling +/// [`LiquidityManager::set_peer_manager()`] post construction. This allows the [`LiquidityManager`] to +/// wake the [`lightning::ln::peer_handler::PeerManager`] when there are pending messages to be sent. +/// +/// Users need to continually poll [`LiquidityManager::get_and_clear_pending_events()`] in order to surface +/// [`Event`]'s that likely need to be handled. +/// +/// Users must forward the [`lightning::events::Event::HTLCIntercepted`] event parameters to [`LiquidityManager::htlc_intercepted()`] +/// and the [`lightning::events::Event::ChannelReady`] event parameters to [`LiquidityManager::channel_ready()`]. +pub struct LiquidityManager< + ES: Deref + Send + Sync + 'static + Clone, + Descriptor: SocketDescriptor + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + RM: Deref + Send + Sync + 'static, + CM: Deref + Send + Sync + 'static, + OM: Deref + Send + Sync + 'static, + NS: Deref + Send + Sync + 'static, +> where ES::Target: EntropySource, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + NS::Target: NodeSigner, { pending_messages: Arc>>, pending_events: Arc, request_id_to_method_map: Mutex>, lsps0_message_handler: LSPS0MessageHandler, + lsps2_message_handler: Option, NS>>, provider_config: Option, } -impl LiquidityManager +impl< + ES: Deref + Send + Sync + 'static + Clone, + Descriptor: SocketDescriptor + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + RM: Deref + Send + Sync + 'static, + CM: Deref + Send + Sync + 'static, + OM: Deref + Send + Sync + 'static, + NS: Deref + Send + Sync + 'static, + > LiquidityManager where ES::Target: EntropySource, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + NS::Target: NodeSigner, { - /// Constructor for the LiquidityManager + /// Constructor for the [`LiquidityManager`] /// /// Sets up the required protocol message handlers based on the given [`LiquidityProviderConfig`]. pub fn new(entropy_source: ES, provider_config: Option) -> Self { let pending_messages = Arc::new(Mutex::new(vec![])); + let pending_events = Arc::new(EventQueue::default()); let lsps0_message_handler = - LSPS0MessageHandler::new(entropy_source, vec![], Arc::clone(&pending_messages)); + LSPS0MessageHandler::new(entropy_source.clone(), vec![], Arc::clone(&pending_messages)); + + let lsps2_message_handler = provider_config.as_ref().and_then(|config| { + config.jit_channels.as_ref().map(|jit_channels_config| { + JITChannelManager::new( + entropy_source.clone(), + jit_channels_config.promise_secret, + Arc::clone(&pending_messages), + Arc::clone(&pending_events), + ) + }) + }); Self { pending_messages, - pending_events: Arc::new(EventQueue::default()), + pending_events, request_id_to_method_map: Mutex::new(HashMap::new()), lsps0_message_handler, provider_config, + lsps2_message_handler, } } @@ -88,6 +158,157 @@ where self.pending_events.get_and_clear_pending_events() } + /// Set a [`lightning::ln::peer_handler::PeerManager`] reference for the message handlers + /// + /// This allows the message handlers to wake the [`lightning::ln::peer_handler::PeerManager`] by calling + /// [`lightning::ln::peer_handler::PeerManager::process_events()`] after enqueing messages to be sent. + /// + /// Without this the messages will be sent based on whatever polling interval + /// your background processor uses. + pub fn set_peer_manager( + &self, peer_manager: Arc, NS>>, + ) { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.set_peer_manager(peer_manager); + } + } + + /// Initiate the creation of an invoice that when paid will open a channel + /// with enough inbound liquidity to be able to receive the payment. + /// + /// `counterparty_node_id` is the node_id of the LSP you would like to use. + /// + /// if `payment_size_msat` is [`Option::Some`] then the invoice will be for a fixed amount + /// and MPP can be used to pay it. + /// + /// if `payment_size_msat` is [`Option::None`] then the invoice can be for an arbitrary amount + /// but MPP can no longer be used to pay it. + /// + /// `token` is an optional String that will be provided to the LSP. + /// it can be used by the LSP as an API key, coupon code, or some other way to identify a user. + pub fn create_invoice( + &self, counterparty_node_id: PublicKey, payment_size_msat: Option, + token: Option, user_channel_id: u128, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.create_invoice( + counterparty_node_id, + payment_size_msat, + token, + user_channel_id, + ); + Ok(()) + } else { + Err(APIError::APIMisuseError { + err: "JIT Channels were not configured when LSPManager was instantiated" + .to_string(), + }) + } + } + + /// Used by LSP to provide fee parameters to a client requesting a JIT Channel. + /// + /// Should be called in response to receiving a [`crate::JITChannelEvent::GetInfo`] event. + pub fn opening_fee_params_generated( + &self, counterparty_node_id: PublicKey, request_id: RequestId, + opening_fee_params_menu: Vec, min_payment_size_msat: u64, + max_payment_size_msat: u64, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.opening_fee_params_generated( + counterparty_node_id, + request_id, + opening_fee_params_menu, + min_payment_size_msat, + max_payment_size_msat, + ) + } else { + Err(APIError::APIMisuseError { + err: "JIT Channels were not configured when LSPManager was instantiated" + .to_string(), + }) + } + } + + /// Used by client to confirm which channel parameters to use for the JIT Channel buy request. + /// + /// Should be called in response to receiving a [`crate::JITChannelEvent::GetInfoResponse`] event. + pub fn opening_fee_params_selected( + &self, counterparty_node_id: PublicKey, channel_id: u128, + opening_fee_params: OpeningFeeParams, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.opening_fee_params_selected( + counterparty_node_id, + channel_id, + opening_fee_params, + ) + } else { + Err(APIError::APIMisuseError { + err: "JIT Channels were not configured when LSPManager was instantiated" + .to_string(), + }) + } + } + + /// Used by LSP to provide client with the scid and cltv_expiry_delta to use in their invoice + /// + /// Should be called in response to receiving a [`crate::JITChannelEvent::BuyRequest`] event. + pub fn invoice_parameters_generated( + &self, counterparty_node_id: PublicKey, request_id: RequestId, scid: u64, + cltv_expiry_delta: u32, client_trusts_lsp: bool, + ) -> Result<(), APIError> { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.invoice_parameters_generated( + counterparty_node_id, + request_id, + scid, + cltv_expiry_delta, + client_trusts_lsp, + ) + } else { + Err(APIError::APIMisuseError { + err: "JIT Channels were not configured when LSPManager was instantiated" + .to_string(), + }) + } + } + + /// Forward [`lightning::events::Event::HTLCIntercepted`] event parameters into this function. + /// + /// Will generate a [`crate::JITChannelEvent::FailInterceptedHTLC`] event if the scid matches a payment we are expecting + /// but the payment amount is incorrect or the expiry has passed. + /// + /// Will generate a [`crate::JITChannelEvent::OpenChannel`] event if the scid matches a payment we are expected + /// and the payment amount is correct and the offer has not expired. + /// + /// Will do nothing if the scid does not match any of the ones we gave out. + pub fn htlc_intercepted( + &self, scid: u64, intercept_id: InterceptId, inbound_amount_msat: u64, + expected_outbound_amount_msat: u64, + ) { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.htlc_intercepted( + scid, + intercept_id, + inbound_amount_msat, + expected_outbound_amount_msat, + ); + } + } + + /// Forward [`lightning::events::Event::ChannelReady`] event parameters into this function. + /// + /// Will generate a [`crate::JITChannelEvent::ForwardInterceptedHTLC`] event if it matches a channel + /// we need to forward a payment over otherwise it will be ignored. + pub fn channel_ready( + &self, user_channel_id: u128, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, + ) { + if let Some(lsps2_message_handler) = &self.lsps2_message_handler { + lsps2_message_handler.channel_ready(user_channel_id, channel_id, counterparty_node_id); + } + } + fn handle_lsps_message( &self, msg: LSPSMessage, sender_node_id: &PublicKey, ) -> Result<(), lightning::ln::msgs::LightningError> { @@ -98,6 +319,14 @@ where LSPSMessage::LSPS0(msg) => { self.lsps0_message_handler.handle_message(msg, sender_node_id)?; } + LSPSMessage::LSPS2(msg) => match &self.lsps2_message_handler { + Some(lsps2_message_handler) => { + lsps2_message_handler.handle_message(msg, sender_node_id)?; + } + None => { + return Err(LightningError { err: format!("Received LSPS2 message without LSPS2 message handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)}); + } + }, } Ok(()) } @@ -108,9 +337,22 @@ where } } -impl CustomMessageReader for LiquidityManager +impl< + ES: Deref + Send + Sync + 'static + Clone, + Descriptor: SocketDescriptor + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + RM: Deref + Send + Sync + 'static, + CM: Deref + Send + Sync + 'static, + OM: Deref + Send + Sync + 'static, + NS: Deref + Send + Sync + 'static, + > CustomMessageReader for LiquidityManager where ES::Target: EntropySource, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + NS::Target: NodeSigner, { type CustomMessage = RawLSPSMessage; @@ -124,16 +366,32 @@ where } } -impl CustomMessageHandler for LiquidityManager +impl< + ES: Deref + Send + Sync + 'static + Clone, + Descriptor: SocketDescriptor + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + RM: Deref + Send + Sync + 'static, + CM: Deref + Send + Sync + 'static, + OM: Deref + Send + Sync + 'static, + NS: Deref + Send + Sync + 'static, + > CustomMessageHandler for LiquidityManager where ES::Target: EntropySource, + L::Target: Logger, + RM::Target: RoutingMessageHandler, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + NS::Target: NodeSigner, { fn handle_custom_message( &self, msg: Self::CustomMessage, sender_node_id: &PublicKey, ) -> Result<(), lightning::ln::msgs::LightningError> { - let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap(); + let message = { + let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap(); + LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map) + }; - match LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map) { + match message { Ok(msg) => self.handle_lsps_message(msg, sender_node_id), Err(_) => { self.enqueue_message(*sender_node_id, LSPSMessage::Invalid); diff --git a/src/transport/msgs.rs b/src/transport/msgs.rs index fcd166c..1d3ff96 100644 --- a/src/transport/msgs.rs +++ b/src/transport/msgs.rs @@ -1,9 +1,11 @@ +use crate::jit_channel; use lightning::impl_writeable_msg; use lightning::ln::wire; use serde::de; use serde::de::{MapAccess, Visitor}; use serde::ser::SerializeStruct; use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::json; use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; @@ -18,6 +20,7 @@ const JSONRPC_RESULT_FIELD_KEY: &str = "result"; const JSONRPC_ERROR_FIELD_KEY: &str = "error"; const JSONRPC_INVALID_MESSAGE_ERROR_CODE: i32 = -32700; const JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE: &str = "parse error"; + const LSPS0_LISTPROTOCOLS_METHOD_NAME: &str = "lsps0.listprotocols"; pub const LSPS_MESSAGE_TYPE: u16 = 37913; @@ -35,7 +38,13 @@ impl wire::Type for RawLSPSMessage { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum Prefix { + LSPS0, + LSPS2, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct RequestId(pub String); #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] @@ -86,6 +95,7 @@ impl TryFrom for LSPS0Message { match message { LSPSMessage::Invalid => Err(()), LSPSMessage::LSPS0(message) => Ok(message), + LSPSMessage::LSPS2(_) => Err(()), } } } @@ -100,6 +110,7 @@ impl From for LSPSMessage { pub enum LSPSMessage { Invalid, LSPS0(LSPS0Message), + LSPS2(jit_channel::msgs::Message), } impl LSPSMessage { @@ -116,6 +127,9 @@ impl LSPSMessage { LSPSMessage::LSPS0(LSPS0Message::Request(request_id, request)) => { Some((request_id.0.clone(), request.method().to_string())) } + LSPSMessage::LSPS2(jit_channel::msgs::Message::Request(request_id, request)) => { + Some((request_id.0.clone(), request.method().to_string())) + } _ => None, } } @@ -154,6 +168,43 @@ impl Serialize for LSPSMessage { } } } + LSPSMessage::LSPS2(jit_channel::msgs::Message::Request(request_id, request)) => { + jsonrpc_object.serialize_field(JSONRPC_ID_FIELD_KEY, &request_id.0)?; + jsonrpc_object.serialize_field(JSONRPC_METHOD_FIELD_KEY, request.method())?; + + match request { + jit_channel::msgs::Request::GetVersions(params) => { + jsonrpc_object.serialize_field(JSONRPC_PARAMS_FIELD_KEY, params)? + } + jit_channel::msgs::Request::GetInfo(params) => { + jsonrpc_object.serialize_field(JSONRPC_PARAMS_FIELD_KEY, params)? + } + jit_channel::msgs::Request::Buy(params) => { + jsonrpc_object.serialize_field(JSONRPC_PARAMS_FIELD_KEY, params)? + } + } + } + LSPSMessage::LSPS2(jit_channel::msgs::Message::Response(request_id, response)) => { + jsonrpc_object.serialize_field(JSONRPC_ID_FIELD_KEY, &request_id.0)?; + + match response { + jit_channel::msgs::Response::GetVersions(result) => { + jsonrpc_object.serialize_field(JSONRPC_RESULT_FIELD_KEY, result)? + } + jit_channel::msgs::Response::GetInfo(result) => { + jsonrpc_object.serialize_field(JSONRPC_RESULT_FIELD_KEY, result)? + } + jit_channel::msgs::Response::GetInfoError(error) => { + jsonrpc_object.serialize_field(JSONRPC_ERROR_FIELD_KEY, error)? + } + jit_channel::msgs::Response::Buy(result) => { + jsonrpc_object.serialize_field(JSONRPC_RESULT_FIELD_KEY, result)? + } + jit_channel::msgs::Response::BuyError(error) => { + jsonrpc_object.serialize_field(JSONRPC_ERROR_FIELD_KEY, error)? + } + } + } LSPSMessage::Invalid => { let error = ResponseError { code: JSONRPC_INVALID_MESSAGE_ERROR_CODE, @@ -224,6 +275,30 @@ impl<'de, 'a> Visitor<'de> for LSPSMessageVisitor<'a> { LSPS0Request::ListProtocols(ListProtocolsRequest {}), ))) } + jit_channel::msgs::LSPS2_GETVERSIONS_METHOD_NAME => { + let request = serde_json::from_value(params.unwrap_or(json!({}))) + .map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Request( + RequestId(id), + jit_channel::msgs::Request::GetVersions(request), + ))) + } + jit_channel::msgs::LSPS2_GETINFO_METHOD_NAME => { + let request = serde_json::from_value(params.unwrap_or(json!({}))) + .map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Request( + RequestId(id), + jit_channel::msgs::Request::GetInfo(request), + ))) + } + jit_channel::msgs::LSPS2_BUY_METHOD_NAME => { + let request = serde_json::from_value(params.unwrap_or(json!({}))) + .map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Request( + RequestId(id), + jit_channel::msgs::Request::Buy(request), + ))) + } _ => Err(de::Error::custom(format!( "Received request with unknown method: {}", method @@ -248,6 +323,52 @@ impl<'de, 'a> Visitor<'de> for LSPSMessageVisitor<'a> { Err(de::Error::custom("Received invalid JSON-RPC object: one of method, result, or error required")) } } + jit_channel::msgs::LSPS2_GETVERSIONS_METHOD_NAME => { + if let Some(result) = result { + let response = + serde_json::from_value(result).map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::GetVersions(response), + ))) + } else { + Err(de::Error::custom("Received invalid lsps2.getversions response.")) + } + } + jit_channel::msgs::LSPS2_GETINFO_METHOD_NAME => { + if let Some(error) = error { + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::GetInfoError(error), + ))) + } else if let Some(result) = result { + let response = + serde_json::from_value(result).map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::GetInfo(response), + ))) + } else { + Err(de::Error::custom("Received invalid JSON-RPC object: one of method, result, or error required")) + } + } + jit_channel::msgs::LSPS2_BUY_METHOD_NAME => { + if let Some(error) = error { + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::BuyError(error), + ))) + } else if let Some(result) = result { + let response = + serde_json::from_value(result).map_err(de::Error::custom)?; + Ok(LSPSMessage::LSPS2(jit_channel::msgs::Message::Response( + RequestId(id), + jit_channel::msgs::Response::Buy(response), + ))) + } else { + Err(de::Error::custom("Received invalid JSON-RPC object: one of method, result, or error required")) + } + } _ => Err(de::Error::custom(format!( "Received response for an unknown request method: {}", method diff --git a/src/utils.rs b/src/utils.rs index 067ce0b..da40131 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -67,3 +67,13 @@ pub fn parse_pubkey(pubkey_str: &str) -> Result { Ok(pubkey.unwrap()) } + +pub fn compute_opening_fee( + payment_size_msat: u64, opening_fee_min_fee_msat: u64, opening_fee_proportional: u64, +) -> Option { + let t1 = payment_size_msat.checked_mul(opening_fee_proportional)?; + let t2 = t1.checked_add(999999)?; + let t3 = t2.checked_div(1000000)?; + let t4 = std::cmp::max(t3, opening_fee_min_fee_msat); + Some(t4) +}