@@ -12,7 +12,7 @@ use lightning::chain::Listen;
12
12
use lightning_block_sync:: http:: HttpEndpoint ;
13
13
use lightning_block_sync:: http:: JsonResponse ;
14
14
use lightning_block_sync:: poll:: ValidatedBlockHeader ;
15
- use lightning_block_sync:: rpc:: RpcClient ;
15
+ use lightning_block_sync:: rpc:: { RpcClient , RpcError } ;
16
16
use lightning_block_sync:: {
17
17
AsyncBlockSourceResult , BlockData , BlockHeaderData , BlockSource , Cache ,
18
18
} ;
@@ -24,10 +24,12 @@ use bitcoin::{BlockHash, FeeRate, Transaction, Txid};
24
24
use base64:: prelude:: { Engine , BASE64_STANDARD } ;
25
25
26
26
use std:: collections:: { HashMap , VecDeque } ;
27
+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
27
28
use std:: sync:: Arc ;
28
29
29
30
pub struct BitcoindRpcClient {
30
31
rpc_client : Arc < RpcClient > ,
32
+ latest_mempool_timestamp : AtomicU64 ,
31
33
}
32
34
33
35
impl BitcoindRpcClient {
@@ -41,7 +43,9 @@ impl BitcoindRpcClient {
41
43
. expect ( "RpcClient::new is actually infallible" ) ,
42
44
) ;
43
45
44
- Self { rpc_client }
46
+ let latest_mempool_timestamp = AtomicU64 :: new ( 0 ) ;
47
+
48
+ Self { rpc_client, latest_mempool_timestamp }
45
49
}
46
50
47
51
pub ( crate ) async fn broadcast_transaction ( & self , tx : & Transaction ) -> std:: io:: Result < Txid > {
@@ -70,6 +74,99 @@ impl BitcoindRpcClient {
70
74
. await
71
75
. map ( |resp| resp. 0 )
72
76
}
77
+
78
+ pub ( crate ) async fn get_raw_transaction (
79
+ & self , txid : & Txid ,
80
+ ) -> std:: io:: Result < Option < Transaction > > {
81
+ let txid_hex = bitcoin:: consensus:: encode:: serialize_hex ( txid) ;
82
+ let txid_json = serde_json:: json!( txid_hex) ;
83
+ match self
84
+ . rpc_client
85
+ . call_method :: < GetRawTransactionResponse > ( "getrawtransaction" , & vec ! [ txid_json] )
86
+ . await
87
+ {
88
+ Ok ( resp) => Ok ( Some ( resp. 0 ) ) ,
89
+ Err ( e) => match e. into_inner ( ) {
90
+ Some ( inner) => {
91
+ let rpc_error_res: Result < Box < RpcError > , _ > = inner. downcast ( ) ;
92
+
93
+ match rpc_error_res {
94
+ Ok ( rpc_error) => {
95
+ // Check if it's the 'not found' error code.
96
+ if rpc_error. code == -5 {
97
+ Ok ( None )
98
+ } else {
99
+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , rpc_error) )
100
+ }
101
+ } ,
102
+ Err ( _) => Err ( std:: io:: Error :: new (
103
+ std:: io:: ErrorKind :: Other ,
104
+ "Failed to process getrawtransaction response" ,
105
+ ) ) ,
106
+ }
107
+ } ,
108
+ None => Err ( std:: io:: Error :: new (
109
+ std:: io:: ErrorKind :: Other ,
110
+ "Failed to process getrawtransaction response" ,
111
+ ) ) ,
112
+ } ,
113
+ }
114
+ }
115
+
116
+ pub ( crate ) async fn get_raw_mempool ( & self ) -> std:: io:: Result < Vec < RawMempoolEntry > > {
117
+ let verbose_flag_json = serde_json:: json!( true ) ;
118
+ self . rpc_client
119
+ . call_method :: < GetRawMempoolResponse > ( "getrawmempool" , & vec ! [ verbose_flag_json] )
120
+ . await
121
+ . map ( |resp| resp. 0 )
122
+ }
123
+
124
+ /// Get mempool transactions, alongside their first-seen unix timestamps.
125
+ ///
126
+ /// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each
127
+ /// transaction only once, unless we cannot assume the transaction's ancestors are already
128
+ /// emitted.
129
+ pub ( crate ) async fn get_mempool_transactions_and_timestamp_at_height (
130
+ & self , best_processed_height : u32 ,
131
+ ) -> std:: io:: Result < Vec < ( Transaction , u64 ) > > {
132
+ let prev_mempool_time = self . latest_mempool_timestamp . load ( Ordering :: Relaxed ) ;
133
+ let mut latest_time = prev_mempool_time;
134
+
135
+ let mempool_entries = self . get_raw_mempool ( ) . await ?;
136
+ let mut txs_to_emit = Vec :: new ( ) ;
137
+
138
+ for entry in mempool_entries {
139
+ if entry. time > latest_time {
140
+ latest_time = entry. time ;
141
+ }
142
+
143
+ // Avoid emitting transactions that are already emitted if we can guarantee
144
+ // blocks containing ancestors are already emitted. The bitcoind rpc interface
145
+ // provides us with the block height that the tx is introduced to the mempool.
146
+ // If we have already emitted the block of height, we can assume that all
147
+ // ancestor txs have been processed by the receiver.
148
+ let ancestor_within_height = entry. height <= best_processed_height;
149
+ let is_already_emitted = entry. time <= prev_mempool_time;
150
+ if is_already_emitted && ancestor_within_height {
151
+ continue ;
152
+ }
153
+
154
+ match self . get_raw_transaction ( & entry. txid ) . await {
155
+ Ok ( Some ( tx) ) => {
156
+ txs_to_emit. push ( ( tx, entry. time ) ) ;
157
+ } ,
158
+ Ok ( None ) => {
159
+ continue ;
160
+ } ,
161
+ Err ( e) => return Err ( e) ,
162
+ } ;
163
+ }
164
+
165
+ if !txs_to_emit. is_empty ( ) {
166
+ self . latest_mempool_timestamp . store ( latest_time, Ordering :: Release ) ;
167
+ }
168
+ Ok ( txs_to_emit)
169
+ }
73
170
}
74
171
75
172
impl BlockSource for BitcoindRpcClient {
@@ -132,6 +229,91 @@ impl TryInto<MempoolMinFeeResponse> for JsonResponse {
132
229
}
133
230
}
134
231
232
+ pub struct GetRawTransactionResponse ( pub Transaction ) ;
233
+
234
+ impl TryInto < GetRawTransactionResponse > for JsonResponse {
235
+ type Error = std:: io:: Error ;
236
+ fn try_into ( self ) -> std:: io:: Result < GetRawTransactionResponse > {
237
+ let tx = self
238
+ . 0
239
+ . as_str ( )
240
+ . ok_or ( std:: io:: Error :: new (
241
+ std:: io:: ErrorKind :: Other ,
242
+ "Failed to parse getrawtransaction response" ,
243
+ ) )
244
+ . and_then ( |s| {
245
+ bitcoin:: consensus:: encode:: deserialize_hex ( s) . map_err ( |_| {
246
+ std:: io:: Error :: new (
247
+ std:: io:: ErrorKind :: Other ,
248
+ "Failed to parse getrawtransaction response" ,
249
+ )
250
+ } )
251
+ } ) ?;
252
+
253
+ Ok ( GetRawTransactionResponse ( tx) )
254
+ }
255
+ }
256
+
257
+ pub struct GetRawMempoolResponse ( Vec < RawMempoolEntry > ) ;
258
+
259
+ impl TryInto < GetRawMempoolResponse > for JsonResponse {
260
+ type Error = std:: io:: Error ;
261
+ fn try_into ( self ) -> std:: io:: Result < GetRawMempoolResponse > {
262
+ let mut mempool_transactions = Vec :: new ( ) ;
263
+ let res = self . 0 . as_object ( ) . ok_or ( std:: io:: Error :: new (
264
+ std:: io:: ErrorKind :: Other ,
265
+ "Failed to parse getrawmempool response" ,
266
+ ) ) ?;
267
+
268
+ for ( k, v) in res {
269
+ let txid = match bitcoin:: consensus:: encode:: deserialize_hex ( k) {
270
+ Ok ( txid) => txid,
271
+ Err ( _) => {
272
+ return Err ( std:: io:: Error :: new (
273
+ std:: io:: ErrorKind :: Other ,
274
+ "Failed to parse getrawmempool response" ,
275
+ ) ) ;
276
+ } ,
277
+ } ;
278
+
279
+ let time = match v[ "time" ] . as_u64 ( ) {
280
+ Some ( time) => time,
281
+ None => {
282
+ return Err ( std:: io:: Error :: new (
283
+ std:: io:: ErrorKind :: Other ,
284
+ "Failed to parse getrawmempool response" ,
285
+ ) ) ;
286
+ } ,
287
+ } ;
288
+
289
+ let height = match v[ "height" ] . as_u64 ( ) . and_then ( |h| h. try_into ( ) . ok ( ) ) {
290
+ Some ( height) => height,
291
+ None => {
292
+ return Err ( std:: io:: Error :: new (
293
+ std:: io:: ErrorKind :: Other ,
294
+ "Failed to parse getrawmempool response" ,
295
+ ) ) ;
296
+ } ,
297
+ } ;
298
+ let entry = RawMempoolEntry { txid, time, height } ;
299
+
300
+ mempool_transactions. push ( entry) ;
301
+ }
302
+
303
+ Ok ( GetRawMempoolResponse ( mempool_transactions) )
304
+ }
305
+ }
306
+
307
+ #[ derive( Debug , Clone ) ]
308
+ pub ( crate ) struct RawMempoolEntry {
309
+ /// The transaction id
310
+ txid : Txid ,
311
+ /// Local time transaction entered pool in seconds since 1 Jan 1970 GMT
312
+ time : u64 ,
313
+ /// Block height when transaction entered pool
314
+ height : u32 ,
315
+ }
316
+
135
317
#[ derive( Debug , Clone , Serialize ) ]
136
318
#[ serde( rename_all = "UPPERCASE" ) ]
137
319
pub ( crate ) enum FeeRateEstimationMode {
0 commit comments