Skip to content

Commit 72f32b0

Browse files
committed
Implement broadcast queue processing
1 parent 344e5e5 commit 72f32b0

File tree

3 files changed

+62
-2
lines changed

3 files changed

+62
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thr
7373
esplora-client = { version = "0.9", default-features = false }
7474
libc = "0.2"
7575
uniffi = { version = "0.26.0", features = ["build"], optional = true }
76+
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
7677

7778
[target.'cfg(vss)'.dependencies]
7879
vss-client = "0.3"

src/chain/bitcoind_rpc.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use lightning_block_sync::{
1616
AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache,
1717
};
1818

19-
use bitcoin::BlockHash;
19+
use bitcoin::{BlockHash, Transaction, Txid};
2020

2121
use base64::prelude::{Engine, BASE64_STANDARD};
2222

@@ -40,6 +40,12 @@ impl BitcoindRpcClient {
4040

4141
Self { rpc_client }
4242
}
43+
44+
pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result<Txid> {
45+
let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx);
46+
let tx_json = serde_json::json!(tx_serialized);
47+
self.rpc_client.call_method::<Txid>("sendrawtransaction", &vec![tx_json]).await
48+
}
4349
}
4450

4551
impl BlockSource for BitcoindRpcClient {

src/chain/mod.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -781,7 +781,60 @@ impl ChainSource {
781781
}
782782
}
783783
},
784-
Self::BitcoindRpc { .. } => todo!(),
784+
Self::BitcoindRpc { bitcoind_rpc_client, tx_broadcaster, logger, .. } => {
785+
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
786+
// features, we should eventually switch to use `submitpackage` via the
787+
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
788+
// transactions.
789+
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
790+
while let Some(next_package) = receiver.recv().await {
791+
for tx in &next_package {
792+
let txid = tx.compute_txid();
793+
let timeout_fut = tokio::time::timeout(
794+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
795+
bitcoind_rpc_client.broadcast_transaction(tx),
796+
);
797+
match timeout_fut.await {
798+
Ok(res) => match res {
799+
Ok(id) => {
800+
debug_assert_eq!(id, txid);
801+
log_trace!(
802+
logger,
803+
"Successfully broadcast transaction {}",
804+
txid
805+
);
806+
},
807+
Err(e) => {
808+
log_error!(
809+
logger,
810+
"Failed to broadcast transaction {}: {}",
811+
txid,
812+
e
813+
);
814+
log_trace!(
815+
logger,
816+
"Failed broadcast transaction bytes: {}",
817+
log_bytes!(tx.encode())
818+
);
819+
},
820+
},
821+
Err(e) => {
822+
log_error!(
823+
logger,
824+
"Failed to broadcast transaction due to timeout {}: {}",
825+
txid,
826+
e
827+
);
828+
log_trace!(
829+
logger,
830+
"Failed broadcast transaction bytes: {}",
831+
log_bytes!(tx.encode())
832+
);
833+
},
834+
}
835+
}
836+
}
837+
},
785838
}
786839
}
787840
}

0 commit comments

Comments
 (0)