diff --git a/.github/workflows/cont_integration.yml b/.github/workflows/cont_integration.yml index 93aae51..dc332ee 100644 --- a/.github/workflows/cont_integration.yml +++ b/.github/workflows/cont_integration.yml @@ -17,7 +17,7 @@ jobs: - name: Checkout uses: actions/checkout@v4 - name: Cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: | ~/.cargo/registry diff --git a/src/raw_client.rs b/src/raw_client.rs index eb78185..45af548 100644 --- a/src/raw_client.rs +++ b/src/raw_client.rs @@ -3,7 +3,7 @@ //! This module contains the definition of the raw client that wraps the transport method use std::borrow::Borrow; -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::io::{BufRead, BufReader, Read, Write}; use std::mem::drop; use std::net::{TcpStream, ToSocketAddrs}; @@ -539,11 +539,10 @@ impl RawClient { if let Some(until_message) = until_message { // If we are trying to start a reader thread but the corresponding sender is - // missing from the map, exit immediately. This can happen with batch calls, - // since the sender is shared for all the individual queries in a call. We - // might have already received a response for that id, but we don't know it - // yet. Exiting here forces the calling code to fallback to the sender-receiver - // method, and it should find a message there waiting for it. + // missing from the map, exit immediately. We might have already received a + // response for that id, but we don't know it yet. Exiting here forces the + // calling code to fallback to the sender-receiver method, and it should find + // a message there waiting for it. if self.waiting_map.lock()?.get(&until_message).is_none() { return Err(Error::CouldntLockReader); } @@ -762,12 +761,10 @@ impl ElectrumApi for RawClient { fn batch_call(&self, batch: &Batch) -> Result, Error> { let mut raw = Vec::new(); - let mut missing_responses = BTreeSet::new(); + let mut missing_responses = Vec::new(); let mut answers = BTreeMap::new(); - // Add our listener to the map before we send the request, Here we will clone the sender - // for every request id, so that we only have to monitor one receiver. - let (sender, receiver) = channel(); + // Add our listener to the map before we send the request for (method, params) in batch.iter() { let req = Request::new_id( @@ -775,9 +772,12 @@ impl ElectrumApi for RawClient { method, params.to_vec(), ); - missing_responses.insert(req.id); + // Add distinct channel to each request so when we remove our request id (and sender) from the waiting_map + // we can be sure that the response gets sent to the correct channel in self.recv + let (sender, receiver) = channel(); + missing_responses.push((req.id, receiver)); - self.waiting_map.lock()?.insert(req.id, sender.clone()); + self.waiting_map.lock()?.insert(req.id, sender); raw.append(&mut serde_json::to_vec(&req)?); raw.extend_from_slice(b"\n"); @@ -796,8 +796,8 @@ impl ElectrumApi for RawClient { self.increment_calls(); - for req_id in missing_responses.iter() { - match self.recv(&receiver, *req_id) { + for (req_id, receiver) in missing_responses.iter() { + match self.recv(receiver, *req_id) { Ok(mut resp) => answers.insert(req_id, resp["result"].take()), Err(e) => { // In case of error our sender could still be left in the map, depending on where @@ -805,7 +805,7 @@ impl ElectrumApi for RawClient { warn!("got error for req_id {}: {:?}", req_id, e); warn!("removing all waiting req of this batch"); let mut guard = self.waiting_map.lock()?; - for req_id in missing_responses.iter() { + for (req_id, _) in missing_responses.iter() { guard.remove(req_id); } return Err(e); @@ -1190,6 +1190,26 @@ mod test { assert_eq!(resp.hash_function, Some("sha256".into())); assert_eq!(resp.pruning, None); } + + #[test] + #[ignore = "depends on a live server"] + fn test_batch_response_ordering() { + // The electrum.blockstream.info:50001 node always sends back ordered responses which will make this always pass. + // However, many servers do not, so we use one of those servers for this test. + let client = RawClient::new("exs.dyshek.org:50001", None).unwrap(); + let heights: Vec = vec![1, 4, 8, 12, 222, 6666, 12]; + let result_times = [ + 1231469665, 1231470988, 1231472743, 1231474888, 1231770653, 1236456633, 1231474888, + ]; + // Check ordering 10 times. This usually fails within 5 if ordering is incorrect. + for _ in 0..10 { + let results = client.batch_block_header(&heights).unwrap(); + for (index, result) in results.iter().enumerate() { + assert_eq!(result_times[index], result.time); + } + } + } + #[test] fn test_relay_fee() { let client = RawClient::new(get_test_server(), None).unwrap();