Skip to content

Commit 98c2d89

Browse files
committed
Move broadcast queue processing to ChainSource
1 parent 4ad2172 commit 98c2d89

File tree

4 files changed

+114
-130
lines changed

4 files changed

+114
-130
lines changed

src/builder.rs

Lines changed: 22 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::chain::{ChainSource, DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL};
8+
use crate::chain::{ChainSource, DEFAULT_ESPLORA_SERVER_URL};
99
use crate::config::{default_user_config, Config, WALLET_KEYS_SEED_LEN};
1010

1111
use crate::connection::ConnectionManager;
@@ -583,60 +583,36 @@ fn build_with_store_internal(
583583
})?,
584584
};
585585

586-
let (wallet, chain_source, tx_broadcaster, fee_estimator) = match chain_data_source_config {
587-
Some(ChainDataSourceConfig::Esplora(server_url)) => {
588-
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
589-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
590-
let esplora_client = client_builder.build_async().unwrap();
591-
let tx_broadcaster =
592-
Arc::new(TransactionBroadcaster::new(esplora_client.clone(), Arc::clone(&logger)));
593-
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
594-
595-
let wallet = Arc::new(Wallet::new(
596-
bdk_wallet,
597-
wallet_persister,
598-
Arc::clone(&tx_broadcaster),
599-
Arc::clone(&fee_estimator),
600-
Arc::clone(&logger),
601-
));
602-
603-
let chain_source = Arc::new(ChainSource::new_esplora(
604-
server_url.clone(),
605-
Arc::clone(&wallet),
606-
Arc::clone(&fee_estimator),
607-
Arc::clone(&config),
608-
Arc::clone(&logger),
609-
));
586+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
587+
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
588+
let wallet = Arc::new(Wallet::new(
589+
bdk_wallet,
590+
wallet_persister,
591+
Arc::clone(&tx_broadcaster),
592+
Arc::clone(&fee_estimator),
593+
Arc::clone(&logger),
594+
));
610595

611-
(wallet, chain_source, tx_broadcaster, fee_estimator)
612-
},
596+
let chain_source = match chain_data_source_config {
597+
Some(ChainDataSourceConfig::Esplora(server_url)) => Arc::new(ChainSource::new_esplora(
598+
server_url.clone(),
599+
Arc::clone(&wallet),
600+
Arc::clone(&fee_estimator),
601+
Arc::clone(&tx_broadcaster),
602+
Arc::clone(&config),
603+
Arc::clone(&logger),
604+
)),
613605
None => {
614606
// Default to Esplora client.
615607
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
616-
let mut client_builder = esplora_client::Builder::new(&server_url);
617-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
618-
let esplora_client = client_builder.build_async().unwrap();
619-
let tx_broadcaster =
620-
Arc::new(TransactionBroadcaster::new(esplora_client.clone(), Arc::clone(&logger)));
621-
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
622-
623-
let wallet = Arc::new(Wallet::new(
624-
bdk_wallet,
625-
wallet_persister,
626-
Arc::clone(&tx_broadcaster),
627-
Arc::clone(&fee_estimator),
628-
Arc::clone(&logger),
629-
));
630-
631-
let chain_source = Arc::new(ChainSource::new_esplora(
608+
Arc::new(ChainSource::new_esplora(
632609
server_url.clone(),
633610
Arc::clone(&wallet),
634611
Arc::clone(&fee_estimator),
612+
Arc::clone(&tx_broadcaster),
635613
Arc::clone(&config),
636614
Arc::clone(&logger),
637-
));
638-
639-
(wallet, chain_source, tx_broadcaster, fee_estimator)
615+
))
640616
},
641617
};
642618

src/chain/mod.rs

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,18 @@
77

88
use crate::config::{
99
Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS,
10-
FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS,
10+
FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS,
1111
};
1212
use crate::fee_estimator::{
1313
apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
1414
OnchainFeeEstimator,
1515
};
16-
use crate::logger::{log_error, log_info, log_trace, FilesystemLogger, Logger};
17-
use crate::types::Wallet;
16+
use crate::logger::{log_bytes, log_error, log_info, log_trace, FilesystemLogger, Logger};
17+
use crate::types::{Broadcaster, Wallet};
1818
use crate::Error;
1919

2020
use lightning::chain::{Confirm, Filter};
21+
use lightning::util::ser::Writeable;
2122

2223
use lightning_transaction_sync::EsploraSyncClient;
2324

@@ -98,6 +99,7 @@ pub(crate) enum ChainSource {
9899
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
99100
lightning_wallet_sync_status: Mutex<WalletSyncStatus>,
100101
fee_estimator: Arc<OnchainFeeEstimator>,
102+
tx_broadcaster: Arc<Broadcaster>,
101103
config: Arc<Config>,
102104
logger: Arc<FilesystemLogger>,
103105
},
@@ -106,9 +108,9 @@ pub(crate) enum ChainSource {
106108
impl ChainSource {
107109
pub(crate) fn new_esplora(
108110
server_url: String, onchain_wallet: Arc<Wallet>, fee_estimator: Arc<OnchainFeeEstimator>,
109-
config: Arc<Config>, logger: Arc<FilesystemLogger>,
111+
tx_broadcaster: Arc<Broadcaster>, config: Arc<Config>, logger: Arc<FilesystemLogger>,
110112
) -> Self {
111-
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
113+
let mut client_builder = esplora_client::Builder::new(&server_url);
112114
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
113115
let esplora_client = client_builder.build_async().unwrap();
114116
let tx_sync =
@@ -122,6 +124,7 @@ impl ChainSource {
122124
tx_sync,
123125
lightning_wallet_sync_status,
124126
fee_estimator,
127+
tx_broadcaster,
125128
config,
126129
logger,
127130
}
@@ -298,6 +301,82 @@ impl ChainSource {
298301
},
299302
}
300303
}
304+
305+
pub(crate) async fn process_broadcast_queue(&self) {
306+
match self {
307+
Self::Esplora { esplora_client, tx_broadcaster, logger, .. } => {
308+
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
309+
while let Some(next_package) = receiver.recv().await {
310+
for tx in &next_package {
311+
let txid = tx.compute_txid();
312+
let timeout_fut = tokio::time::timeout(
313+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
314+
esplora_client.broadcast(tx),
315+
);
316+
match timeout_fut.await {
317+
Ok(res) => match res {
318+
Ok(()) => {
319+
log_trace!(
320+
logger,
321+
"Successfully broadcast transaction {}",
322+
txid
323+
);
324+
},
325+
Err(e) => match e {
326+
esplora_client::Error::Reqwest(err) => {
327+
if err.status() == reqwest::StatusCode::from_u16(400).ok() {
328+
// Ignore 400, as this just means bitcoind already knows the
329+
// transaction.
330+
// FIXME: We can further differentiate here based on the error
331+
// message which will be available with rust-esplora-client 0.7 and
332+
// later.
333+
} else {
334+
log_error!(
335+
logger,
336+
"Failed to broadcast due to HTTP connection error: {}",
337+
err
338+
);
339+
}
340+
log_trace!(
341+
logger,
342+
"Failed broadcast transaction bytes: {}",
343+
log_bytes!(tx.encode())
344+
);
345+
},
346+
_ => {
347+
log_error!(
348+
logger,
349+
"Failed to broadcast transaction {}: {}",
350+
txid,
351+
e
352+
);
353+
log_trace!(
354+
logger,
355+
"Failed broadcast transaction bytes: {}",
356+
log_bytes!(tx.encode())
357+
);
358+
},
359+
},
360+
},
361+
Err(e) => {
362+
log_error!(
363+
logger,
364+
"Failed to broadcast transaction due to timeout {}: {}",
365+
txid,
366+
e
367+
);
368+
log_trace!(
369+
logger,
370+
"Failed broadcast transaction bytes: {}",
371+
log_bytes!(tx.encode())
372+
);
373+
},
374+
}
375+
}
376+
}
377+
},
378+
}
379+
}
301380
}
302381

303382
impl Filter for ChainSource {

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ impl Node {
669669
}
670670

671671
let mut stop_tx_bcast = self.stop_sender.subscribe();
672-
let tx_bcaster = Arc::clone(&self.tx_broadcaster);
672+
let chain_source = Arc::clone(&self.chain_source);
673673
let tx_bcast_logger = Arc::clone(&self.logger);
674674
runtime.spawn(async move {
675675
// Every second we try to clear our broadcasting queue.
@@ -685,7 +685,7 @@ impl Node {
685685
return;
686686
}
687687
_ = interval.tick() => {
688-
tx_bcaster.process_queue().await;
688+
chain_source.process_broadcast_queue().await;
689689
}
690690
}
691691
}

src/tx_broadcaster.rs

Lines changed: 6 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,16 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::config::TX_BROADCAST_TIMEOUT_SECS;
9-
use crate::logger::{log_bytes, log_error, log_trace, Logger};
8+
use crate::logger::{log_error, Logger};
109

1110
use lightning::chain::chaininterface::BroadcasterInterface;
12-
use lightning::util::ser::Writeable;
13-
14-
use esplora_client::AsyncClient as EsploraClient;
1511

1612
use bitcoin::Transaction;
1713

18-
use reqwest::StatusCode;
1914
use tokio::sync::mpsc;
20-
use tokio::sync::Mutex;
15+
use tokio::sync::{Mutex, MutexGuard};
2116

2217
use std::ops::Deref;
23-
use std::time::Duration;
2418

2519
const BCAST_PACKAGE_QUEUE_SIZE: usize = 50;
2620

@@ -30,85 +24,20 @@ where
3024
{
3125
queue_sender: mpsc::Sender<Vec<Transaction>>,
3226
queue_receiver: Mutex<mpsc::Receiver<Vec<Transaction>>>,
33-
esplora_client: EsploraClient,
3427
logger: L,
3528
}
3629

3730
impl<L: Deref> TransactionBroadcaster<L>
3831
where
3932
L::Target: Logger,
4033
{
41-
pub(crate) fn new(esplora_client: EsploraClient, logger: L) -> Self {
34+
pub(crate) fn new(logger: L) -> Self {
4235
let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE);
43-
Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), esplora_client, logger }
36+
Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), logger }
4437
}
4538

46-
pub(crate) async fn process_queue(&self) {
47-
let mut receiver = self.queue_receiver.lock().await;
48-
while let Some(next_package) = receiver.recv().await {
49-
for tx in &next_package {
50-
let txid = tx.compute_txid();
51-
let timeout_fut = tokio::time::timeout(
52-
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
53-
self.esplora_client.broadcast(tx),
54-
);
55-
match timeout_fut.await {
56-
Ok(res) => match res {
57-
Ok(()) => {
58-
log_trace!(self.logger, "Successfully broadcast transaction {}", txid);
59-
},
60-
Err(e) => match e {
61-
esplora_client::Error::Reqwest(err) => {
62-
if err.status() == StatusCode::from_u16(400).ok() {
63-
// Ignore 400, as this just means bitcoind already knows the
64-
// transaction.
65-
// FIXME: We can further differentiate here based on the error
66-
// message which will be available with rust-esplora-client 0.7 and
67-
// later.
68-
} else {
69-
log_error!(
70-
self.logger,
71-
"Failed to broadcast due to HTTP connection error: {}",
72-
err
73-
);
74-
}
75-
log_trace!(
76-
self.logger,
77-
"Failed broadcast transaction bytes: {}",
78-
log_bytes!(tx.encode())
79-
);
80-
},
81-
_ => {
82-
log_error!(
83-
self.logger,
84-
"Failed to broadcast transaction {}: {}",
85-
txid,
86-
e
87-
);
88-
log_trace!(
89-
self.logger,
90-
"Failed broadcast transaction bytes: {}",
91-
log_bytes!(tx.encode())
92-
);
93-
},
94-
},
95-
},
96-
Err(e) => {
97-
log_error!(
98-
self.logger,
99-
"Failed to broadcast transaction due to timeout {}: {}",
100-
txid,
101-
e
102-
);
103-
log_trace!(
104-
self.logger,
105-
"Failed broadcast transaction bytes: {}",
106-
log_bytes!(tx.encode())
107-
);
108-
},
109-
}
110-
}
111-
}
39+
pub(crate) async fn get_broadcast_queue(&self) -> MutexGuard<mpsc::Receiver<Vec<Transaction>>> {
40+
self.queue_receiver.lock().await
11241
}
11342
}
11443

0 commit comments

Comments
 (0)