Skip to content

Commit 93314b5

Browse files
committed
Copy the fixed version of init::synchronize_listeners
We previously discovered a minor bug that resulted in `synchronize_listeners` not calling the actual implementation of `Listen::block_connected`. While we will fix this upstream (lightningdevkit/rust-lightning#3354), we now copy over the fixed code in question to not be blocked on the next LDK release. We also slightly adjusted it to account for types/APIs that are only visible inside of `lightning-block-sync`. We intend to drop the newly added module as soon as we can upgrade to the next LDK release shipping the fixed version.
1 parent 8bf54fa commit 93314b5

File tree

2 files changed

+394
-2
lines changed

2 files changed

+394
-2
lines changed

src/chain/block_sync_init.rs

Lines changed: 391 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,391 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
//! This module contains code that was copied from `lightning_block_sync` and slightly adjusted to account for
9+
//! crate-visible types/APIs.
10+
// TODO: Drop this version when upgrading to the LDK release shipping
11+
// https://github.com/lightningdevkit/rust-lightning/pull/3354.
12+
13+
use lightning::chain::Listen;
14+
15+
use lightning_block_sync::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader};
16+
use lightning_block_sync::{BlockData, BlockSource, BlockSourceError, BlockSourceResult, Cache};
17+
18+
use bitcoin::block::Header;
19+
use bitcoin::hash_types::BlockHash;
20+
use bitcoin::network::Network;
21+
22+
use lightning::chain;
23+
24+
use std::ops::Deref;
25+
26+
/// Returns a validated block header of the source's best chain tip.
27+
///
28+
/// Upon success, the returned header can be used to initialize [`SpvClient`]. Useful during a fresh
29+
/// start when there are no chain listeners to sync yet.
30+
///
31+
/// [`SpvClient`]: crate::SpvClient
32+
async fn validate_best_block_header<B: Deref>(
33+
block_source: B,
34+
) -> BlockSourceResult<ValidatedBlockHeader>
35+
where
36+
B::Target: BlockSource,
37+
{
38+
let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
39+
block_source.get_header(&best_block_hash, best_block_height).await?.validate(best_block_hash)
40+
}
41+
42+
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
43+
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
44+
///
45+
/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of
46+
/// failure, each listener may be left at a different block hash than the one it was originally
47+
/// paired with.
48+
///
49+
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
50+
/// switching to [`SpvClient`]. For example:
51+
///
52+
/// ```
53+
/// use bitcoin::hash_types::BlockHash;
54+
/// use bitcoin::network::Network;
55+
///
56+
/// use lightning::chain;
57+
/// use lightning::chain::Watch;
58+
/// use lightning::chain::chainmonitor;
59+
/// use lightning::chain::chainmonitor::ChainMonitor;
60+
/// use lightning::chain::channelmonitor::ChannelMonitor;
61+
/// use lightning::chain::chaininterface::BroadcasterInterface;
62+
/// use lightning::chain::chaininterface::FeeEstimator;
63+
/// use lightning::sign;
64+
/// use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
65+
/// use lightning::ln::channelmanager::{ChannelManager, ChannelManagerReadArgs};
66+
/// use lightning::routing::router::Router;
67+
/// use lightning::util::config::UserConfig;
68+
/// use lightning::util::logger::Logger;
69+
/// use lightning::util::ser::ReadableArgs;
70+
///
71+
/// use lightning_block_sync::*;
72+
///
73+
/// use lightning::io::Cursor;
74+
///
75+
/// async fn init_sync<
76+
/// B: BlockSource,
77+
/// ES: EntropySource,
78+
/// NS: NodeSigner,
79+
/// SP: SignerProvider,
80+
/// T: BroadcasterInterface,
81+
/// F: FeeEstimator,
82+
/// R: Router,
83+
/// L: Logger,
84+
/// C: chain::Filter,
85+
/// P: chainmonitor::Persist<SP::EcdsaSigner>,
86+
/// >(
87+
/// block_source: &B,
88+
/// chain_monitor: &ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P>,
89+
/// config: UserConfig,
90+
/// entropy_source: &ES,
91+
/// node_signer: &NS,
92+
/// signer_provider: &SP,
93+
/// tx_broadcaster: &T,
94+
/// fee_estimator: &F,
95+
/// router: &R,
96+
/// logger: &L,
97+
/// persister: &P,
98+
/// ) {
99+
/// // Read a serialized channel monitor paired with the block hash when it was persisted.
100+
/// let serialized_monitor = "...";
101+
/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor<SP::EcdsaSigner>)>::read(
102+
/// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap();
103+
///
104+
/// // Read the channel manager paired with the block hash when it was persisted.
105+
/// let serialized_manager = "...";
106+
/// let (manager_block_hash, mut manager) = {
107+
/// let read_args = ChannelManagerReadArgs::new(
108+
/// entropy_source,
109+
/// node_signer,
110+
/// signer_provider,
111+
/// fee_estimator,
112+
/// chain_monitor,
113+
/// tx_broadcaster,
114+
/// router,
115+
/// logger,
116+
/// config,
117+
/// vec![&mut monitor],
118+
/// );
119+
/// <(BlockHash, ChannelManager<&ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P>, &T, &ES, &NS, &SP, &F, &R, &L>)>::read(
120+
/// &mut Cursor::new(&serialized_manager), read_args).unwrap()
121+
/// };
122+
///
123+
/// // Synchronize any channel monitors and the channel manager to be on the best block.
124+
/// let mut cache = UnboundedCache::new();
125+
/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger);
126+
/// let listeners = vec![
127+
/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen),
128+
/// (manager_block_hash, &manager as &dyn chain::Listen),
129+
/// ];
130+
/// let chain_tip = init::synchronize_listeners(
131+
/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
132+
///
133+
/// // Allow the chain monitor to watch any channels.
134+
/// let monitor = monitor_listener.0;
135+
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
136+
///
137+
/// // Create an SPV client to notify the chain monitor and channel manager of block events.
138+
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
139+
/// let mut chain_listener = (chain_monitor, &manager);
140+
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener);
141+
/// }
142+
/// ```
143+
///
144+
/// [`SpvClient`]: crate::SpvClient
145+
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
146+
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
147+
pub(super) async fn synchronize_listeners<
148+
B: Deref + Sized + Send + Sync,
149+
C: Cache,
150+
L: chain::Listen + ?Sized,
151+
>(
152+
block_source: B, network: Network, header_cache: &mut C,
153+
mut chain_listeners: Vec<(BlockHash, &L)>,
154+
) -> BlockSourceResult<ValidatedBlockHeader>
155+
where
156+
B::Target: BlockSource,
157+
{
158+
let best_header = validate_best_block_header(&*block_source).await?;
159+
160+
// Fetch the header for the block hash paired with each listener.
161+
let mut chain_listeners_with_old_headers = Vec::new();
162+
for (old_block_hash, chain_listener) in chain_listeners.drain(..) {
163+
let old_header = match header_cache.look_up(&old_block_hash) {
164+
Some(header) => *header,
165+
None => {
166+
block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)?
167+
},
168+
};
169+
chain_listeners_with_old_headers.push((old_header, chain_listener))
170+
}
171+
172+
// Find differences and disconnect blocks for each listener individually.
173+
let mut chain_poller = ChainPoller::new(block_source, network);
174+
let mut chain_listeners_at_height = Vec::new();
175+
let mut most_common_ancestor = None;
176+
let mut most_connected_blocks = Vec::new();
177+
for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
178+
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
179+
let header_cache = &mut ReadOnlyCache(header_cache);
180+
let (common_ancestor, connected_blocks) = {
181+
let chain_listener = &DynamicChainListener(chain_listener);
182+
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
183+
let difference =
184+
chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?;
185+
chain_notifier.disconnect_blocks(difference.disconnected_blocks);
186+
(difference.common_ancestor, difference.connected_blocks)
187+
};
188+
189+
// Keep track of the most common ancestor and all blocks connected across all listeners.
190+
chain_listeners_at_height.push((common_ancestor.height, chain_listener));
191+
if connected_blocks.len() > most_connected_blocks.len() {
192+
most_common_ancestor = Some(common_ancestor);
193+
most_connected_blocks = connected_blocks;
194+
}
195+
}
196+
197+
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
198+
if let Some(common_ancestor) = most_common_ancestor {
199+
let chain_listener = &ChainListenerSet(chain_listeners_at_height);
200+
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
201+
chain_notifier
202+
.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
203+
.await
204+
.map_err(|(e, _)| e)?;
205+
}
206+
207+
Ok(best_header)
208+
}
209+
210+
/// A wrapper to make a cache read-only.
211+
///
212+
/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one
213+
/// listener.
214+
struct ReadOnlyCache<'a, C: Cache>(&'a mut C);
215+
216+
impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
217+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
218+
self.0.look_up(block_hash)
219+
}
220+
221+
fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
222+
unreachable!()
223+
}
224+
225+
fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
226+
None
227+
}
228+
}
229+
230+
/// Wrapper for supporting dynamically sized chain listeners.
231+
struct DynamicChainListener<'a, L: chain::Listen + ?Sized>(&'a L);
232+
233+
impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L> {
234+
fn filtered_block_connected(
235+
&self, _header: &Header, _txdata: &chain::transaction::TransactionData, _height: u32,
236+
) {
237+
unreachable!()
238+
}
239+
240+
fn block_disconnected(&self, header: &Header, height: u32) {
241+
self.0.block_disconnected(header, height)
242+
}
243+
}
244+
245+
/// A set of dynamically sized chain listeners, each paired with a starting block height.
246+
struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);
247+
248+
impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
249+
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
250+
for (starting_height, chain_listener) in self.0.iter() {
251+
if height > *starting_height {
252+
chain_listener.block_connected(block, height);
253+
}
254+
}
255+
}
256+
257+
fn filtered_block_connected(
258+
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
259+
) {
260+
for (starting_height, chain_listener) in self.0.iter() {
261+
if height > *starting_height {
262+
chain_listener.filtered_block_connected(header, txdata, height);
263+
}
264+
}
265+
}
266+
267+
fn block_disconnected(&self, _header: &Header, _height: u32) {
268+
unreachable!()
269+
}
270+
}
271+
272+
/// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
273+
///
274+
/// [listeners]: lightning::chain::Listen
275+
struct ChainNotifier<'a, C: Cache, L: Deref>
276+
where
277+
L::Target: chain::Listen,
278+
{
279+
/// Cache for looking up headers before fetching from a block source.
280+
header_cache: &'a mut C,
281+
282+
/// Listener that will be notified of connected or disconnected blocks.
283+
chain_listener: L,
284+
}
285+
286+
/// Changes made to the chain between subsequent polls that transformed it from having one chain tip
287+
/// to another.
288+
///
289+
/// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order
290+
/// before new blocks are connected in reverse order.
291+
struct ChainDifference {
292+
/// The most recent ancestor common between the chain tips.
293+
///
294+
/// If there are any disconnected blocks, this is where the chain forked.
295+
common_ancestor: ValidatedBlockHeader,
296+
297+
/// Blocks that were disconnected from the chain since the last poll.
298+
disconnected_blocks: Vec<ValidatedBlockHeader>,
299+
300+
/// Blocks that were connected to the chain since the last poll.
301+
connected_blocks: Vec<ValidatedBlockHeader>,
302+
}
303+
304+
impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L>
305+
where
306+
L::Target: chain::Listen,
307+
{
308+
/// Returns the changes needed to produce the chain with `current_header` as its tip from the
309+
/// chain with `prev_header` as its tip.
310+
///
311+
/// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
312+
async fn find_difference<P: Poll>(
313+
&self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader,
314+
chain_poller: &mut P,
315+
) -> BlockSourceResult<ChainDifference> {
316+
let mut disconnected_blocks = Vec::new();
317+
let mut connected_blocks = Vec::new();
318+
let mut current = current_header;
319+
let mut previous = *prev_header;
320+
loop {
321+
// Found the common ancestor.
322+
if current.to_best_block().block_hash == previous.to_best_block().block_hash {
323+
break;
324+
}
325+
326+
// Walk back the chain, finding blocks needed to connect and disconnect. Only walk back
327+
// the header with the greater height, or both if equal heights.
328+
let current_height = current.height;
329+
let previous_height = previous.height;
330+
if current_height <= previous_height {
331+
disconnected_blocks.push(previous);
332+
previous = self.look_up_previous_header(chain_poller, &previous).await?;
333+
}
334+
if current_height >= previous_height {
335+
connected_blocks.push(current);
336+
current = self.look_up_previous_header(chain_poller, &current).await?;
337+
}
338+
}
339+
340+
let common_ancestor = current;
341+
Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks })
342+
}
343+
344+
/// Returns the previous header for the given header, either by looking it up in the cache or
345+
/// fetching it if not found.
346+
async fn look_up_previous_header<P: Poll>(
347+
&self, chain_poller: &mut P, header: &ValidatedBlockHeader,
348+
) -> BlockSourceResult<ValidatedBlockHeader> {
349+
match self.header_cache.look_up(&header.header.prev_blockhash) {
350+
Some(prev_header) => Ok(*prev_header),
351+
None => chain_poller.look_up_previous_header(header).await,
352+
}
353+
}
354+
355+
/// Notifies the chain listeners of disconnected blocks.
356+
fn disconnect_blocks(&mut self, mut disconnected_blocks: Vec<ValidatedBlockHeader>) {
357+
for header in disconnected_blocks.drain(..) {
358+
if let Some(cached_header) =
359+
self.header_cache.block_disconnected(&header.to_best_block().block_hash)
360+
{
361+
assert_eq!(cached_header, header);
362+
}
363+
self.chain_listener.block_disconnected(&header.header, header.height);
364+
}
365+
}
366+
367+
/// Notifies the chain listeners of connected blocks.
368+
async fn connect_blocks<P: Poll>(
369+
&mut self, mut new_tip: ValidatedBlockHeader,
370+
mut connected_blocks: Vec<ValidatedBlockHeader>, chain_poller: &mut P,
371+
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
372+
for header in connected_blocks.drain(..).rev() {
373+
let height = header.height;
374+
let block_data =
375+
chain_poller.fetch_block(&header).await.map_err(|e| (e, Some(new_tip)))?;
376+
match block_data.deref() {
377+
BlockData::FullBlock(block) => {
378+
self.chain_listener.block_connected(block, height);
379+
},
380+
BlockData::HeaderOnly(header) => {
381+
self.chain_listener.filtered_block_connected(header, &[], height);
382+
},
383+
}
384+
385+
self.header_cache.block_connected(header.to_best_block().block_hash, header);
386+
new_tip = header;
387+
}
388+
389+
Ok(())
390+
}
391+
}

0 commit comments

Comments
 (0)