Skip to content

Commit 97d51eb

Browse files
committed
Retrieve and apply unconfirmed transactions from the mempool
.. to allow the on-chain wallet to detect what's inflight.
1 parent ee164a1 commit 97d51eb

File tree

3 files changed

+217
-5
lines changed

3 files changed

+217
-5
lines changed

src/chain/bitcoind_rpc.rs

Lines changed: 184 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use lightning::chain::Listen;
1212
use lightning_block_sync::http::HttpEndpoint;
1313
use lightning_block_sync::http::JsonResponse;
1414
use lightning_block_sync::poll::ValidatedBlockHeader;
15-
use lightning_block_sync::rpc::RpcClient;
15+
use lightning_block_sync::rpc::{RpcClient, RpcError};
1616
use lightning_block_sync::{
1717
AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache,
1818
};
@@ -24,10 +24,12 @@ use bitcoin::{BlockHash, FeeRate, Transaction, Txid};
2424
use base64::prelude::{Engine, BASE64_STANDARD};
2525

2626
use std::collections::{HashMap, VecDeque};
27+
use std::sync::atomic::{AtomicU64, Ordering};
2728
use std::sync::Arc;
2829

2930
pub struct BitcoindRpcClient {
3031
rpc_client: Arc<RpcClient>,
32+
latest_mempool_timestamp: AtomicU64,
3133
}
3234

3335
impl BitcoindRpcClient {
@@ -41,7 +43,9 @@ impl BitcoindRpcClient {
4143
.expect("RpcClient::new is actually infallible"),
4244
);
4345

44-
Self { rpc_client }
46+
let latest_mempool_timestamp = AtomicU64::new(0);
47+
48+
Self { rpc_client, latest_mempool_timestamp }
4549
}
4650

4751
pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result<Txid> {
@@ -70,6 +74,99 @@ impl BitcoindRpcClient {
7074
.await
7175
.map(|resp| resp.0)
7276
}
77+
78+
pub(crate) async fn get_raw_transaction(
79+
&self, txid: &Txid,
80+
) -> std::io::Result<Option<Transaction>> {
81+
let txid_hex = bitcoin::consensus::encode::serialize_hex(txid);
82+
let txid_json = serde_json::json!(txid_hex);
83+
match self
84+
.rpc_client
85+
.call_method::<GetRawTransactionResponse>("getrawtransaction", &vec![txid_json])
86+
.await
87+
{
88+
Ok(resp) => Ok(Some(resp.0)),
89+
Err(e) => match e.into_inner() {
90+
Some(inner) => {
91+
let rpc_error_res: Result<Box<RpcError>, _> = inner.downcast();
92+
93+
match rpc_error_res {
94+
Ok(rpc_error) => {
95+
// Check if it's the 'not found' error code.
96+
if rpc_error.code == -5 {
97+
Ok(None)
98+
} else {
99+
Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error))
100+
}
101+
},
102+
Err(_) => Err(std::io::Error::new(
103+
std::io::ErrorKind::Other,
104+
"Failed to process getrawtransaction response",
105+
)),
106+
}
107+
},
108+
None => Err(std::io::Error::new(
109+
std::io::ErrorKind::Other,
110+
"Failed to process getrawtransaction response",
111+
)),
112+
},
113+
}
114+
}
115+
116+
pub(crate) async fn get_raw_mempool(&self) -> std::io::Result<Vec<RawMempoolEntry>> {
117+
let verbose_flag_json = serde_json::json!(true);
118+
self.rpc_client
119+
.call_method::<GetRawMempoolResponse>("getrawmempool", &vec![verbose_flag_json])
120+
.await
121+
.map(|resp| resp.0)
122+
}
123+
124+
/// Get mempool transactions, alongside their first-seen unix timestamps.
125+
///
126+
/// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each
127+
/// transaction only once, unless we cannot assume the transaction's ancestors are already
128+
/// emitted.
129+
pub(crate) async fn get_mempool_transactions_and_timestamp_at_height(
130+
&self, best_processed_height: u32,
131+
) -> std::io::Result<Vec<(Transaction, u64)>> {
132+
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
133+
let mut latest_time = prev_mempool_time;
134+
135+
let mempool_entries = self.get_raw_mempool().await?;
136+
let mut txs_to_emit = Vec::new();
137+
138+
for entry in mempool_entries {
139+
if entry.time > latest_time {
140+
latest_time = entry.time;
141+
}
142+
143+
// Avoid emitting transactions that are already emitted if we can guarantee
144+
// blocks containing ancestors are already emitted. The bitcoind rpc interface
145+
// provides us with the block height that the tx is introduced to the mempool.
146+
// If we have already emitted the block of height, we can assume that all
147+
// ancestor txs have been processed by the receiver.
148+
let ancestor_within_height = entry.height <= best_processed_height;
149+
let is_already_emitted = entry.time <= prev_mempool_time;
150+
if is_already_emitted && ancestor_within_height {
151+
continue;
152+
}
153+
154+
match self.get_raw_transaction(&entry.txid).await {
155+
Ok(Some(tx)) => {
156+
txs_to_emit.push((tx, entry.time));
157+
},
158+
Ok(None) => {
159+
continue;
160+
},
161+
Err(e) => return Err(e),
162+
};
163+
}
164+
165+
if !txs_to_emit.is_empty() {
166+
self.latest_mempool_timestamp.store(latest_time, Ordering::Release);
167+
}
168+
Ok(txs_to_emit)
169+
}
73170
}
74171

75172
impl BlockSource for BitcoindRpcClient {
@@ -132,6 +229,91 @@ impl TryInto<MempoolMinFeeResponse> for JsonResponse {
132229
}
133230
}
134231

232+
pub struct GetRawTransactionResponse(pub Transaction);
233+
234+
impl TryInto<GetRawTransactionResponse> for JsonResponse {
235+
type Error = std::io::Error;
236+
fn try_into(self) -> std::io::Result<GetRawTransactionResponse> {
237+
let tx = self
238+
.0
239+
.as_str()
240+
.ok_or(std::io::Error::new(
241+
std::io::ErrorKind::Other,
242+
"Failed to parse getrawtransaction response",
243+
))
244+
.and_then(|s| {
245+
bitcoin::consensus::encode::deserialize_hex(s).map_err(|_| {
246+
std::io::Error::new(
247+
std::io::ErrorKind::Other,
248+
"Failed to parse getrawtransaction response",
249+
)
250+
})
251+
})?;
252+
253+
Ok(GetRawTransactionResponse(tx))
254+
}
255+
}
256+
257+
pub struct GetRawMempoolResponse(Vec<RawMempoolEntry>);
258+
259+
impl TryInto<GetRawMempoolResponse> for JsonResponse {
260+
type Error = std::io::Error;
261+
fn try_into(self) -> std::io::Result<GetRawMempoolResponse> {
262+
let mut mempool_transactions = Vec::new();
263+
let res = self.0.as_object().ok_or(std::io::Error::new(
264+
std::io::ErrorKind::Other,
265+
"Failed to parse getrawmempool response",
266+
))?;
267+
268+
for (k, v) in res {
269+
let txid = match bitcoin::consensus::encode::deserialize_hex(k) {
270+
Ok(txid) => txid,
271+
Err(_) => {
272+
return Err(std::io::Error::new(
273+
std::io::ErrorKind::Other,
274+
"Failed to parse getrawmempool response",
275+
));
276+
},
277+
};
278+
279+
let time = match v["time"].as_u64() {
280+
Some(time) => time,
281+
None => {
282+
return Err(std::io::Error::new(
283+
std::io::ErrorKind::Other,
284+
"Failed to parse getrawmempool response",
285+
));
286+
},
287+
};
288+
289+
let height = match v["height"].as_u64().and_then(|h| h.try_into().ok()) {
290+
Some(height) => height,
291+
None => {
292+
return Err(std::io::Error::new(
293+
std::io::ErrorKind::Other,
294+
"Failed to parse getrawmempool response",
295+
));
296+
},
297+
};
298+
let entry = RawMempoolEntry { txid, time, height };
299+
300+
mempool_transactions.push(entry);
301+
}
302+
303+
Ok(GetRawMempoolResponse(mempool_transactions))
304+
}
305+
}
306+
307+
#[derive(Debug, Clone)]
308+
pub(crate) struct RawMempoolEntry {
309+
/// The transaction id
310+
txid: Txid,
311+
/// Local time transaction entered pool in seconds since 1 Jan 1970 GMT
312+
time: u64,
313+
/// Block height when transaction entered pool
314+
height: u32,
315+
}
316+
135317
#[derive(Debug, Clone, Serialize)]
136318
#[serde(rename_all = "UPPERCASE")]
137319
pub(crate) enum FeeRateEstimationMode {

src/chain/mod.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,9 +368,24 @@ impl ChainSource {
368368
return;
369369
}
370370
_ = chain_polling_interval.tick() => {
371-
let _ = spv_client.poll_best_tip().await.map_err(|e| {
372-
log_error!(logger, "Failed to poll for chain data: {:?}", e);
373-
});
371+
match spv_client.poll_best_tip().await {
372+
Ok(_) => (),
373+
Err(e) => {
374+
log_error!(logger, "Failed to poll for chain data: {:?}", e);
375+
continue;
376+
}
377+
}
378+
379+
let cur_height = channel_manager.current_best_block().height;
380+
match bitcoind_rpc_client.get_mempool_transactions_and_timestamp_at_height(cur_height).await {
381+
Ok(unconfirmed_txs) => {
382+
let _ = onchain_wallet.apply_unconfirmed_txs(unconfirmed_txs);
383+
}
384+
Err(e) => {
385+
log_error!(logger, "Failed to poll for mempool transactions: {:?}", e);
386+
continue;
387+
}
388+
}
374389
{
375390
let unix_time_secs_opt = SystemTime::now()
376391
.duration_since(UNIX_EPOCH)

src/wallet/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,21 @@ where
109109
}
110110
}
111111

112+
pub(crate) fn apply_unconfirmed_txs(
113+
&self, unconfirmed_txs: Vec<(Transaction, u64)>,
114+
) -> Result<(), Error> {
115+
let mut locked_wallet = self.inner.lock().unwrap();
116+
locked_wallet.apply_unconfirmed_txs(unconfirmed_txs);
117+
118+
let mut locked_persister = self.persister.lock().unwrap();
119+
locked_wallet.persist(&mut locked_persister).map_err(|e| {
120+
log_error!(self.logger, "Failed to persist wallet: {}", e);
121+
Error::PersistenceFailed
122+
})?;
123+
124+
Ok(())
125+
}
126+
112127
pub(crate) fn create_funding_transaction(
113128
&self, output_script: ScriptBuf, amount: Amount, confirmation_target: ConfirmationTarget,
114129
locktime: LockTime,

0 commit comments

Comments
 (0)