Skip to content

Commit 01a7b6c

Browse files
committed
Implement broadcast queue processing
1 parent e11d137 commit 01a7b6c

File tree

3 files changed

+62
-2
lines changed

3 files changed

+62
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thr
7373
esplora-client = { version = "0.9", default-features = false }
7474
libc = "0.2"
7575
uniffi = { version = "0.26.0", features = ["build"], optional = true }
76+
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
7677

7778
[target.'cfg(vss)'.dependencies]
7879
vss-client = "0.3"

src/chain/bitcoind_rpc.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use lightning_block_sync::{
1616
AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache,
1717
};
1818

19-
use bitcoin::BlockHash;
19+
use bitcoin::{BlockHash, Transaction, Txid};
2020

2121
use base64::prelude::{Engine, BASE64_STANDARD};
2222

@@ -40,6 +40,12 @@ impl BitcoindRpcClient {
4040

4141
Self { rpc_client }
4242
}
43+
44+
pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result<Txid> {
45+
let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx);
46+
let tx_json = serde_json::json!(tx_serialized);
47+
self.rpc_client.call_method::<Txid>("sendrawtransaction", &vec![tx_json]).await
48+
}
4349
}
4450

4551
impl BlockSource for BitcoindRpcClient {

src/chain/mod.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,60 @@ impl ChainSource {
780780
}
781781
}
782782
},
783-
Self::BitcoindRpc { .. } => todo!(),
783+
Self::BitcoindRpc { bitcoind_rpc_client, tx_broadcaster, logger, .. } => {
784+
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
785+
// features, we should eventually switch to use `submitpackage` via the
786+
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
787+
// transactions.
788+
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
789+
while let Some(next_package) = receiver.recv().await {
790+
for tx in &next_package {
791+
let txid = tx.compute_txid();
792+
let timeout_fut = tokio::time::timeout(
793+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
794+
bitcoind_rpc_client.broadcast_transaction(tx),
795+
);
796+
match timeout_fut.await {
797+
Ok(res) => match res {
798+
Ok(id) => {
799+
debug_assert_eq!(id, txid);
800+
log_trace!(
801+
logger,
802+
"Successfully broadcast transaction {}",
803+
txid
804+
);
805+
},
806+
Err(e) => {
807+
log_error!(
808+
logger,
809+
"Failed to broadcast transaction {}: {}",
810+
txid,
811+
e
812+
);
813+
log_trace!(
814+
logger,
815+
"Failed broadcast transaction bytes: {}",
816+
log_bytes!(tx.encode())
817+
);
818+
},
819+
},
820+
Err(e) => {
821+
log_error!(
822+
logger,
823+
"Failed to broadcast transaction due to timeout {}: {}",
824+
txid,
825+
e
826+
);
827+
log_trace!(
828+
logger,
829+
"Failed broadcast transaction bytes: {}",
830+
log_bytes!(tx.encode())
831+
);
832+
},
833+
}
834+
}
835+
}
836+
},
784837
}
785838
}
786839
}

0 commit comments

Comments
 (0)