Skip to content

Commit 61b2f37

Browse files
committed
Implement LiquiditySource CMH
We add a `LiquiditySource` object that is configurable via builder methods, currently allowing to source inbound liquidity from an LSPS2 service. In the next commit(s) we'll add corresponding API and event handling.
1 parent 86d1a1f commit 61b2f37

File tree

7 files changed

+306
-11
lines changed

7 files changed

+306
-11
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ lightning-persister = { version = "0.0.121" }
3535
lightning-background-processor = { version = "0.0.121", features = ["futures"] }
3636
lightning-rapid-gossip-sync = { version = "0.0.121" }
3737
lightning-transaction-sync = { version = "0.0.121", features = ["esplora-async-https", "time"] }
38+
lightning-liquidity = { version = "0.1.0-alpha", features = ["std"] }
3839

3940
#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std"] }
4041
#lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" }
@@ -43,6 +44,7 @@ lightning-transaction-sync = { version = "0.0.121", features = ["esplora-async-h
4344
#lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["futures"] }
4445
#lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" }
4546
#lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["esplora-async"] }
47+
#lightning-liquidity = { git = "https://github.com/lightningdevkit/lightning-liquidity", branch="main", features = ["std"] }
4648

4749
#lightning = { path = "../rust-lightning/lightning", features = ["std"] }
4850
#lightning-invoice = { path = "../rust-lightning/lightning-invoice" }
@@ -51,6 +53,7 @@ lightning-transaction-sync = { version = "0.0.121", features = ["esplora-async-h
5153
#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor", features = ["futures"] }
5254
#lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" }
5355
#lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async"] }
56+
#lightning-liquidity = { path = "../lightning-liquidity", features = ["std"] }
5457

5558
bdk = { version = "0.29.0", default-features = false, features = ["std", "async-interface", "use-esplora-async", "sqlite-bundled", "keys-bip39"]}
5659

bindings/ldk_node.udl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ interface Builder {
2828
void set_esplora_server(string esplora_server_url);
2929
void set_gossip_source_p2p();
3030
void set_gossip_source_rgs(string rgs_server_url);
31+
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
3132
void set_storage_dir_path(string storage_dir_path);
3233
void set_network(Network network);
3334
[Throws=BuildError]

src/builder.rs

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use crate::fee_estimator::OnchainFeeEstimator;
33
use crate::gossip::GossipSource;
44
use crate::io;
55
use crate::io::sqlite_store::SqliteStore;
6+
use crate::liquidity::LiquiditySource;
67
use crate::logger::{log_error, FilesystemLogger, Logger};
8+
use crate::message_handler::NodeCustomMessageHandler;
79
use crate::payment_store::PaymentStore;
810
use crate::peer_store::PeerStore;
911
use crate::sweep::OutputSweeper;
@@ -40,6 +42,9 @@ use lightning_persister::fs_store::FilesystemStore;
4042

4143
use lightning_transaction_sync::EsploraSyncClient;
4244

45+
use lightning_liquidity::lsps2::client::LSPS2ClientConfig;
46+
use lightning_liquidity::{LiquidityClientConfig, LiquidityManager};
47+
4348
#[cfg(any(vss, vss_test))]
4449
use crate::io::vss_store::VssStore;
4550
use bdk::bitcoin::secp256k1::Secp256k1;
@@ -49,6 +54,7 @@ use bdk::template::Bip84;
4954

5055
use bip39::Mnemonic;
5156

57+
use bitcoin::secp256k1::PublicKey;
5258
use bitcoin::{BlockHash, Network};
5359

5460
#[cfg(any(vss, vss_test))]
@@ -80,6 +86,18 @@ enum GossipSourceConfig {
8086
RapidGossipSync(String),
8187
}
8288

89+
#[derive(Debug, Clone)]
90+
struct LiquiditySourceConfig {
91+
// LSPS2 service's (address, node_id, token)
92+
lsps2_service: Option<(SocketAddress, PublicKey, Option<String>)>,
93+
}
94+
95+
impl Default for LiquiditySourceConfig {
96+
fn default() -> Self {
97+
Self { lsps2_service: None }
98+
}
99+
}
100+
83101
/// An error encountered during building a [`Node`].
84102
///
85103
/// [`Node`]: crate::Node
@@ -146,16 +164,14 @@ pub struct NodeBuilder {
146164
entropy_source_config: Option<EntropySourceConfig>,
147165
chain_data_source_config: Option<ChainDataSourceConfig>,
148166
gossip_source_config: Option<GossipSourceConfig>,
167+
liquidity_source_config: Option<LiquiditySourceConfig>,
149168
}
150169

151170
impl NodeBuilder {
152171
/// Creates a new builder instance with the default configuration.
153172
pub fn new() -> Self {
154173
let config = Config::default();
155-
let entropy_source_config = None;
156-
let chain_data_source_config = None;
157-
let gossip_source_config = None;
158-
Self { config, entropy_source_config, chain_data_source_config, gossip_source_config }
174+
Self::from_config(config)
159175
}
160176

161177
/// Creates a new builder instance from an [`Config`].
@@ -164,7 +180,14 @@ impl NodeBuilder {
164180
let entropy_source_config = None;
165181
let chain_data_source_config = None;
166182
let gossip_source_config = None;
167-
Self { config, entropy_source_config, chain_data_source_config, gossip_source_config }
183+
let liquidity_source_config = None;
184+
Self {
185+
config,
186+
entropy_source_config,
187+
chain_data_source_config,
188+
gossip_source_config,
189+
liquidity_source_config,
190+
}
168191
}
169192

170193
/// Configures the [`Node`] instance to source its wallet entropy from a seed file on disk.
@@ -218,6 +241,25 @@ impl NodeBuilder {
218241
self
219242
}
220243

244+
/// Configures the [`Node`] instance to source its inbound liquidity from the given
245+
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
246+
/// service.
247+
///
248+
/// Will mark the LSP as trusted for 0-confirmation channels, see [`Config::trusted_peers_0conf`].
249+
///
250+
/// The given `token` will be used by the LSP to authenticate the user.
251+
pub fn set_liquidity_source_lsps2(
252+
&mut self, address: SocketAddress, node_id: PublicKey, token: Option<String>,
253+
) -> &mut Self {
254+
// Mark the LSP as trusted for 0conf
255+
self.config.trusted_peers_0conf.push(node_id.clone());
256+
257+
let liquidity_source_config =
258+
self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default());
259+
liquidity_source_config.lsps2_service = Some((address, node_id, token));
260+
self
261+
}
262+
221263
/// Sets the used storage directory path.
222264
pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self {
223265
self.config.storage_dir_path = storage_dir_path;
@@ -318,6 +360,7 @@ impl NodeBuilder {
318360
config,
319361
self.chain_data_source_config.as_ref(),
320362
self.gossip_source_config.as_ref(),
363+
self.liquidity_source_config.as_ref(),
321364
seed_bytes,
322365
logger,
323366
vss_store,
@@ -340,6 +383,7 @@ impl NodeBuilder {
340383
config,
341384
self.chain_data_source_config.as_ref(),
342385
self.gossip_source_config.as_ref(),
386+
self.liquidity_source_config.as_ref(),
343387
seed_bytes,
344388
logger,
345389
kv_store,
@@ -413,6 +457,19 @@ impl ArcedNodeBuilder {
413457
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
414458
}
415459

460+
/// Configures the [`Node`] instance to source its inbound liquidity from the given
461+
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
462+
/// service.
463+
///
464+
/// Will mark the LSP as trusted for 0-confirmation channels, see [`Config::trusted_peers_0conf`].
465+
///
466+
/// The given `token` will be used by the LSP to authenticate the user.
467+
pub fn set_liquidity_source_lsps2(
468+
&self, address: SocketAddress, node_id: PublicKey, token: Option<String>,
469+
) {
470+
self.inner.write().unwrap().set_liquidity_source_lsps2(address, node_id, token);
471+
}
472+
416473
/// Sets the used storage directory path.
417474
pub fn set_storage_dir_path(&self, storage_dir_path: String) {
418475
self.inner.write().unwrap().set_storage_dir_path(storage_dir_path);
@@ -463,7 +520,8 @@ impl ArcedNodeBuilder {
463520
/// Builds a [`Node`] instance according to the options previously configured.
464521
fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
465522
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
466-
gossip_source_config: Option<&GossipSourceConfig>, seed_bytes: [u8; 64],
523+
gossip_source_config: Option<&GossipSourceConfig>,
524+
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
467525
logger: Arc<FilesystemLogger>, kv_store: Arc<K>,
468526
) -> Result<Node<K>, BuildError> {
469527
// Initialize the on-chain wallet and chain access
@@ -746,20 +804,51 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
746804
}
747805
};
748806

807+
let liquidity_source = liquidity_source_config.as_ref().and_then(|lsc| {
808+
lsc.lsps2_service.as_ref().map(|(address, node_id, token)| {
809+
let lsps2_client_config = Some(LSPS2ClientConfig {});
810+
let liquidity_client_config = Some(LiquidityClientConfig { lsps2_client_config });
811+
let liquidity_manager = Arc::new(LiquidityManager::new(
812+
Arc::clone(&keys_manager),
813+
Arc::clone(&channel_manager),
814+
Some(Arc::clone(&tx_sync)),
815+
None,
816+
None,
817+
liquidity_client_config,
818+
));
819+
Arc::new(LiquiditySource::new_lsps2(
820+
address.clone(),
821+
*node_id,
822+
token.clone(),
823+
Arc::clone(&channel_manager),
824+
Arc::clone(&keys_manager),
825+
liquidity_manager,
826+
Arc::clone(&config),
827+
Arc::clone(&logger),
828+
))
829+
})
830+
});
831+
832+
let custom_message_handler = if let Some(liquidity_source) = liquidity_source.as_ref() {
833+
Arc::new(NodeCustomMessageHandler::new_liquidity(Arc::clone(&liquidity_source)))
834+
} else {
835+
Arc::new(NodeCustomMessageHandler::new_ignoring())
836+
};
837+
749838
let msg_handler = match gossip_source.as_gossip_sync() {
750839
GossipSync::P2P(p2p_gossip_sync) => MessageHandler {
751840
chan_handler: Arc::clone(&channel_manager),
752841
route_handler: Arc::clone(&p2p_gossip_sync)
753842
as Arc<dyn RoutingMessageHandler + Sync + Send>,
754843
onion_message_handler: onion_messenger,
755-
custom_message_handler: IgnoringMessageHandler {},
844+
custom_message_handler,
756845
},
757846
GossipSync::Rapid(_) => MessageHandler {
758847
chan_handler: Arc::clone(&channel_manager),
759848
route_handler: Arc::new(IgnoringMessageHandler {})
760849
as Arc<dyn RoutingMessageHandler + Sync + Send>,
761850
onion_message_handler: onion_messenger,
762-
custom_message_handler: IgnoringMessageHandler {},
851+
custom_message_handler,
763852
},
764853
GossipSync::None => {
765854
unreachable!("We must always have a gossip sync!");
@@ -782,6 +871,8 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
782871
Arc::clone(&keys_manager),
783872
));
784873

874+
liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager)));
875+
785876
// Init payment info storage
786877
let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
787878
Ok(payments) => {
@@ -853,6 +944,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
853944
keys_manager,
854945
network_graph,
855946
gossip_source,
947+
liquidity_source,
856948
kv_store,
857949
logger,
858950
_router: router,

src/lib.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ mod fee_estimator;
8282
mod gossip;
8383
mod hex_utils;
8484
pub mod io;
85+
mod liquidity;
8586
mod logger;
87+
mod message_handler;
8688
mod payment_store;
8789
mod peer_store;
8890
mod sweep;
@@ -116,6 +118,7 @@ pub use builder::NodeBuilder as Builder;
116118

117119
use event::{EventHandler, EventQueue};
118120
use gossip::GossipSource;
121+
use liquidity::LiquiditySource;
119122
use payment_store::PaymentStore;
120123
pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus};
121124
use peer_store::{PeerInfo, PeerStore};
@@ -311,6 +314,7 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
311314
keys_manager: Arc<KeysManager>,
312315
network_graph: Arc<NetworkGraph>,
313316
gossip_source: Arc<GossipSource>,
317+
liquidity_source: Option<Arc<LiquiditySource<K, Arc<FilesystemLogger>>>>,
314318
kv_store: Arc<K>,
315319
logger: Arc<FilesystemLogger>,
316320
_router: Arc<Router>,
@@ -764,6 +768,21 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
764768
});
765769
});
766770

771+
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
772+
let mut stop_liquidity_handler = self.stop_receiver.clone();
773+
let liquidity_handler = Arc::clone(&liquidity_source);
774+
runtime.spawn(async move {
775+
loop {
776+
tokio::select! {
777+
_ = stop_liquidity_handler.changed() => {
778+
return;
779+
}
780+
_ = liquidity_handler.handle_next_event() => {}
781+
}
782+
}
783+
});
784+
}
785+
767786
*runtime_lock = Some(runtime);
768787

769788
log_info!(self.logger, "Startup complete.");

src/liquidity.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use crate::types::{ChannelManager, KeysManager, LiquidityManager, PeerManager};
2+
use crate::Config;
3+
4+
use lightning::ln::msgs::SocketAddress;
5+
use lightning::util::logger::Logger;
6+
use lightning::util::persist::KVStore;
7+
use lightning_liquidity::events::Event;
8+
use lightning_liquidity::lsps2::event::LSPS2ClientEvent;
9+
10+
use bitcoin::secp256k1::PublicKey;
11+
12+
use std::ops::Deref;
13+
use std::sync::Arc;
14+
15+
struct LSPS2Service {
16+
address: SocketAddress,
17+
node_id: PublicKey,
18+
token: Option<String>,
19+
}
20+
21+
pub(crate) struct LiquiditySource<K: KVStore + Sync + Send + 'static, L: Deref>
22+
where
23+
L::Target: Logger,
24+
{
25+
lsps2_service: Option<LSPS2Service>,
26+
channel_manager: Arc<ChannelManager<K>>,
27+
keys_manager: Arc<KeysManager>,
28+
liquidity_manager: Arc<LiquidityManager<K>>,
29+
config: Arc<Config>,
30+
logger: L,
31+
}
32+
33+
impl<K: KVStore + Sync + Send, L: Deref> LiquiditySource<K, L>
34+
where
35+
L::Target: Logger,
36+
{
37+
pub(crate) fn new_lsps2(
38+
address: SocketAddress, node_id: PublicKey, token: Option<String>,
39+
channel_manager: Arc<ChannelManager<K>>, keys_manager: Arc<KeysManager>,
40+
liquidity_manager: Arc<LiquidityManager<K>>, config: Arc<Config>, logger: L,
41+
) -> Self {
42+
let lsps2_service = Some(LSPS2Service { address, node_id, token });
43+
Self { lsps2_service, channel_manager, keys_manager, liquidity_manager, config, logger }
44+
}
45+
46+
pub(crate) fn set_peer_manager(&self, peer_manager: Arc<PeerManager<K>>) {
47+
let process_msgs_callback = move || peer_manager.process_events();
48+
self.liquidity_manager.set_process_msgs_callback(process_msgs_callback);
49+
}
50+
51+
pub(crate) fn liquidity_manager(&self) -> &LiquidityManager<K> {
52+
self.liquidity_manager.as_ref()
53+
}
54+
55+
pub(crate) async fn handle_next_event(&self) {
56+
match self.liquidity_manager.next_event_async().await {
57+
Event::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady {
58+
counterparty_node_id: _,
59+
opening_fee_params_menu: _,
60+
min_payment_size_msat: _,
61+
max_payment_size_msat: _,
62+
}) => {}
63+
Event::LSPS2Client(LSPS2ClientEvent::InvoiceParametersReady {
64+
counterparty_node_id: _,
65+
intercept_scid: _,
66+
cltv_expiry_delta: _,
67+
payment_size_msat: _,
68+
user_channel_id: _,
69+
}) => {}
70+
_ => {}
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)