Skip to content

Commit 7e6f9c1

Browse files
authored
Merge pull request #465 from tnull/2025-02-fix-mempool-access
bitcoind RPC: Make mempool syncing more efficient
2 parents 9f8d30a + 150ea3d commit 7e6f9c1

File tree

2 files changed

+104
-16
lines changed

2 files changed

+104
-16
lines changed

src/chain/bitcoind_rpc.rs

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use std::sync::Arc;
3030
pub struct BitcoindRpcClient {
3131
rpc_client: Arc<RpcClient>,
3232
latest_mempool_timestamp: AtomicU64,
33+
mempool_entries_cache: tokio::sync::Mutex<HashMap<Txid, MempoolEntry>>,
34+
mempool_txs_cache: tokio::sync::Mutex<HashMap<Txid, (Transaction, u64)>>,
3335
}
3436

3537
impl BitcoindRpcClient {
@@ -42,7 +44,9 @@ impl BitcoindRpcClient {
4244

4345
let latest_mempool_timestamp = AtomicU64::new(0);
4446

45-
Self { rpc_client, latest_mempool_timestamp }
47+
let mempool_entries_cache = tokio::sync::Mutex::new(HashMap::new());
48+
let mempool_txs_cache = tokio::sync::Mutex::new(HashMap::new());
49+
Self { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache }
4650
}
4751

4852
pub(crate) fn rpc_client(&self) -> Arc<RpcClient> {
@@ -122,23 +126,68 @@ impl BitcoindRpcClient {
122126
.map(|resp| resp.0)
123127
}
124128

125-
pub(crate) async fn get_mempool_entry(&self, txid: Txid) -> std::io::Result<MempoolEntry> {
129+
pub(crate) async fn get_mempool_entry(
130+
&self, txid: Txid,
131+
) -> std::io::Result<Option<MempoolEntry>> {
126132
let txid_hex = bitcoin::consensus::encode::serialize_hex(&txid);
127133
let txid_json = serde_json::json!(txid_hex);
128-
self.rpc_client
134+
match self
135+
.rpc_client
129136
.call_method::<GetMempoolEntryResponse>("getmempoolentry", &[txid_json])
130137
.await
131-
.map(|resp| MempoolEntry { txid, height: resp.height, time: resp.time })
138+
{
139+
Ok(resp) => Ok(Some(MempoolEntry { txid, height: resp.height, time: resp.time })),
140+
Err(e) => match e.into_inner() {
141+
Some(inner) => {
142+
let rpc_error_res: Result<Box<RpcError>, _> = inner.downcast();
143+
144+
match rpc_error_res {
145+
Ok(rpc_error) => {
146+
// Check if it's the 'not found' error code.
147+
if rpc_error.code == -5 {
148+
Ok(None)
149+
} else {
150+
Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error))
151+
}
152+
},
153+
Err(_) => Err(std::io::Error::new(
154+
std::io::ErrorKind::Other,
155+
"Failed to process getmempoolentry response",
156+
)),
157+
}
158+
},
159+
None => Err(std::io::Error::new(
160+
std::io::ErrorKind::Other,
161+
"Failed to process getmempoolentry response",
162+
)),
163+
},
164+
}
132165
}
133166

134-
pub(crate) async fn get_mempool_entries(&self) -> std::io::Result<Vec<MempoolEntry>> {
167+
pub(crate) async fn update_mempool_entries_cache(&self) -> std::io::Result<()> {
135168
let mempool_txids = self.get_raw_mempool().await?;
136-
let mut mempool_entries = Vec::with_capacity(mempool_txids.len());
169+
170+
let mut mempool_entries_cache = self.mempool_entries_cache.lock().await;
171+
mempool_entries_cache.retain(|txid, _| mempool_txids.contains(txid));
172+
173+
if let Some(difference) = mempool_txids.len().checked_sub(mempool_entries_cache.capacity())
174+
{
175+
mempool_entries_cache.reserve(difference)
176+
}
177+
137178
for txid in mempool_txids {
138-
let entry = self.get_mempool_entry(txid).await?;
139-
mempool_entries.push(entry);
179+
if mempool_entries_cache.contains_key(&txid) {
180+
continue;
181+
}
182+
183+
if let Some(entry) = self.get_mempool_entry(txid).await? {
184+
mempool_entries_cache.insert(txid, entry.clone());
185+
}
140186
}
141-
Ok(mempool_entries)
187+
188+
mempool_entries_cache.shrink_to_fit();
189+
190+
Ok(())
142191
}
143192

144193
/// Get mempool transactions, alongside their first-seen unix timestamps.
@@ -152,10 +201,20 @@ impl BitcoindRpcClient {
152201
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
153202
let mut latest_time = prev_mempool_time;
154203

155-
let mempool_entries = self.get_mempool_entries().await?;
156-
let mut txs_to_emit = Vec::new();
204+
self.update_mempool_entries_cache().await?;
205+
206+
let mempool_entries_cache = self.mempool_entries_cache.lock().await;
207+
let mut mempool_txs_cache = self.mempool_txs_cache.lock().await;
208+
mempool_txs_cache.retain(|txid, _| mempool_entries_cache.contains_key(txid));
157209

158-
for entry in mempool_entries {
210+
if let Some(difference) =
211+
mempool_entries_cache.len().checked_sub(mempool_txs_cache.capacity())
212+
{
213+
mempool_txs_cache.reserve(difference)
214+
}
215+
216+
let mut txs_to_emit = Vec::with_capacity(mempool_entries_cache.len());
217+
for (txid, entry) in mempool_entries_cache.iter() {
159218
if entry.time > latest_time {
160219
latest_time = entry.time;
161220
}
@@ -171,8 +230,14 @@ impl BitcoindRpcClient {
171230
continue;
172231
}
173232

233+
if let Some((cached_tx, cached_time)) = mempool_txs_cache.get(txid) {
234+
txs_to_emit.push((cached_tx.clone(), *cached_time));
235+
continue;
236+
}
237+
174238
match self.get_raw_transaction(&entry.txid).await {
175239
Ok(Some(tx)) => {
240+
mempool_txs_cache.insert(entry.txid, (tx.clone(), entry.time));
176241
txs_to_emit.push((tx, entry.time));
177242
},
178243
Ok(None) => {

src/chain/mod.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,14 @@ impl ChainSource {
317317
));
318318
}
319319

320+
log_info!(
321+
logger,
322+
"Starting initial synchronization of chain listeners. This might take a while..",
323+
);
324+
320325
loop {
321326
let mut locked_header_cache = header_cache.lock().await;
327+
let now = SystemTime::now();
322328
match synchronize_listeners(
323329
bitcoind_rpc_client.as_ref(),
324330
config.network,
@@ -329,6 +335,11 @@ impl ChainSource {
329335
{
330336
Ok(chain_tip) => {
331337
{
338+
log_info!(
339+
logger,
340+
"Finished synchronizing listeners in {}ms",
341+
now.elapsed().unwrap().as_millis()
342+
);
332343
*latest_chain_tip.write().unwrap() = Some(chain_tip);
333344
let unix_time_secs_opt = SystemTime::now()
334345
.duration_since(UNIX_EPOCH)
@@ -374,6 +385,8 @@ impl ChainSource {
374385
fee_rate_update_interval
375386
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
376387

388+
log_info!(logger, "Starting continuous polling for chain updates.");
389+
377390
// Start the polling loop.
378391
loop {
379392
tokio::select! {
@@ -692,13 +705,15 @@ impl ChainSource {
692705
&mut *locked_header_cache,
693706
&chain_listener,
694707
);
695-
let mut chain_polling_interval =
696-
tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS));
697-
chain_polling_interval
698-
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
699708

709+
let now = SystemTime::now();
700710
match spv_client.poll_best_tip().await {
701711
Ok((ChainTip::Better(tip), true)) => {
712+
log_trace!(
713+
logger,
714+
"Finished polling best tip in {}ms",
715+
now.elapsed().unwrap().as_millis()
716+
);
702717
*latest_chain_tip.write().unwrap() = Some(tip);
703718
},
704719
Ok(_) => {},
@@ -711,11 +726,19 @@ impl ChainSource {
711726
}
712727

713728
let cur_height = channel_manager.current_best_block().height;
729+
730+
let now = SystemTime::now();
714731
match bitcoind_rpc_client
715732
.get_mempool_transactions_and_timestamp_at_height(cur_height)
716733
.await
717734
{
718735
Ok(unconfirmed_txs) => {
736+
log_trace!(
737+
logger,
738+
"Finished polling mempool of size {} in {}ms",
739+
unconfirmed_txs.len(),
740+
now.elapsed().unwrap().as_millis()
741+
);
719742
let _ = onchain_wallet.apply_unconfirmed_txs(unconfirmed_txs);
720743
},
721744
Err(e) => {

0 commit comments

Comments
 (0)