Skip to content

Commit 935a17e

Browse files
committed
Implement broadcast queue processing
1 parent b756d79 commit 935a17e

File tree

3 files changed

+62
-2
lines changed

3 files changed

+62
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thr
7373
esplora-client = { version = "0.9", default-features = false }
7474
libc = "0.2"
7575
uniffi = { version = "0.26.0", features = ["build"], optional = true }
76+
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
7677

7778
[target.'cfg(vss)'.dependencies]
7879
vss-client = "0.3"

src/chain/bitcoind_rpc.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use lightning_block_sync::{
1616
AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache,
1717
};
1818

19-
use bitcoin::BlockHash;
19+
use bitcoin::{BlockHash, Transaction, Txid};
2020

2121
use base64::prelude::{Engine, BASE64_STANDARD};
2222

@@ -40,6 +40,12 @@ impl BitcoindRpcClient {
4040

4141
Self { rpc_client }
4242
}
43+
44+
pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result<Txid> {
45+
let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx);
46+
let tx_json = serde_json::json!(tx_serialized);
47+
self.rpc_client.call_method::<Txid>("sendrawtransaction", &vec![tx_json]).await
48+
}
4349
}
4450

4551
impl BlockSource for BitcoindRpcClient {

src/chain/mod.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -882,7 +882,60 @@ impl ChainSource {
882882
}
883883
}
884884
},
885-
Self::BitcoindRpc { .. } => todo!(),
885+
Self::BitcoindRpc { bitcoind_rpc_client, tx_broadcaster, logger, .. } => {
886+
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
887+
// features, we should eventually switch to use `submitpackage` via the
888+
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
889+
// transactions.
890+
let mut receiver = tx_broadcaster.get_broadcast_queue().await;
891+
while let Some(next_package) = receiver.recv().await {
892+
for tx in &next_package {
893+
let txid = tx.compute_txid();
894+
let timeout_fut = tokio::time::timeout(
895+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
896+
bitcoind_rpc_client.broadcast_transaction(tx),
897+
);
898+
match timeout_fut.await {
899+
Ok(res) => match res {
900+
Ok(id) => {
901+
debug_assert_eq!(id, txid);
902+
log_trace!(
903+
logger,
904+
"Successfully broadcast transaction {}",
905+
txid
906+
);
907+
},
908+
Err(e) => {
909+
log_error!(
910+
logger,
911+
"Failed to broadcast transaction {}: {}",
912+
txid,
913+
e
914+
);
915+
log_trace!(
916+
logger,
917+
"Failed broadcast transaction bytes: {}",
918+
log_bytes!(tx.encode())
919+
);
920+
},
921+
},
922+
Err(e) => {
923+
log_error!(
924+
logger,
925+
"Failed to broadcast transaction due to timeout {}: {}",
926+
txid,
927+
e
928+
);
929+
log_trace!(
930+
logger,
931+
"Failed broadcast transaction bytes: {}",
932+
log_bytes!(tx.encode())
933+
);
934+
},
935+
}
936+
}
937+
}
938+
},
886939
}
887940
}
888941
}

0 commit comments

Comments
 (0)