From 6721929f22be6243e8bcaa71de9d3372e4a3fbba Mon Sep 17 00:00:00 2001 From: marshallyale <19430240+marshallyale@users.noreply.github.com> Date: Wed, 22 Jan 2025 13:46:41 -0800 Subject: [PATCH 1/3] fix: fix batch ordering issue Fixes issue #75 Raw client waiting map was using the same channel for every request/response. When items were put back into the channel inside of _reader_thread the waiting receiver in recv would just take the next response without validating it on request id request. This fixes this by using unique channels for each request response inside of the waiting map. --- src/raw_client.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/raw_client.rs b/src/raw_client.rs index eb78185..28ffc61 100644 --- a/src/raw_client.rs +++ b/src/raw_client.rs @@ -3,7 +3,7 @@ //! This module contains the definition of the raw client that wraps the transport method use std::borrow::Borrow; -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::io::{BufRead, BufReader, Read, Write}; use std::mem::drop; use std::net::{TcpStream, ToSocketAddrs}; @@ -539,11 +539,10 @@ impl RawClient { if let Some(until_message) = until_message { // If we are trying to start a reader thread but the corresponding sender is - // missing from the map, exit immediately. This can happen with batch calls, - // since the sender is shared for all the individual queries in a call. We - // might have already received a response for that id, but we don't know it - // yet. Exiting here forces the calling code to fallback to the sender-receiver - // method, and it should find a message there waiting for it. + // missing from the map, exit immediately. We might have already received a + // response for that id, but we don't know it yet. Exiting here forces the + // calling code to fallback to the sender-receiver method, and it should find + // a message there waiting for it. if self.waiting_map.lock()?.get(&until_message).is_none() { return Err(Error::CouldntLockReader); } @@ -762,12 +761,10 @@ impl ElectrumApi for RawClient { fn batch_call(&self, batch: &Batch) -> Result, Error> { let mut raw = Vec::new(); - let mut missing_responses = BTreeSet::new(); + let mut missing_responses = Vec::new(); let mut answers = BTreeMap::new(); - // Add our listener to the map before we send the request, Here we will clone the sender - // for every request id, so that we only have to monitor one receiver. - let (sender, receiver) = channel(); + // Add our listener to the map before we send the request for (method, params) in batch.iter() { let req = Request::new_id( @@ -775,9 +772,12 @@ impl ElectrumApi for RawClient { method, params.to_vec(), ); - missing_responses.insert(req.id); + // Add distinct channel to each request so when we remove our request id (and sender) from the waiting_map + // we can be sure that the response gets sent to the correct channel in self.recv + let (sender, receiver) = channel(); + missing_responses.push((req.id, receiver)); - self.waiting_map.lock()?.insert(req.id, sender.clone()); + self.waiting_map.lock()?.insert(req.id, sender); raw.append(&mut serde_json::to_vec(&req)?); raw.extend_from_slice(b"\n"); @@ -796,8 +796,8 @@ impl ElectrumApi for RawClient { self.increment_calls(); - for req_id in missing_responses.iter() { - match self.recv(&receiver, *req_id) { + for (req_id, receiver) in missing_responses.iter() { + match self.recv(receiver, *req_id) { Ok(mut resp) => answers.insert(req_id, resp["result"].take()), Err(e) => { // In case of error our sender could still be left in the map, depending on where @@ -805,7 +805,7 @@ impl ElectrumApi for RawClient { warn!("got error for req_id {}: {:?}", req_id, e); warn!("removing all waiting req of this batch"); let mut guard = self.waiting_map.lock()?; - for req_id in missing_responses.iter() { + for (req_id, _) in missing_responses.iter() { guard.remove(req_id); } return Err(e); From 0ee0a6ad7ac335847cefd19c41c5522023be0be0 Mon Sep 17 00:00:00 2001 From: marshallyale <19430240+marshallyale@users.noreply.github.com> Date: Sat, 15 Mar 2025 11:11:36 -0700 Subject: [PATCH 2/3] test: add batch response ordering test --- src/raw_client.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/raw_client.rs b/src/raw_client.rs index 28ffc61..45af548 100644 --- a/src/raw_client.rs +++ b/src/raw_client.rs @@ -1190,6 +1190,26 @@ mod test { assert_eq!(resp.hash_function, Some("sha256".into())); assert_eq!(resp.pruning, None); } + + #[test] + #[ignore = "depends on a live server"] + fn test_batch_response_ordering() { + // The electrum.blockstream.info:50001 node always sends back ordered responses which will make this always pass. + // However, many servers do not, so we use one of those servers for this test. + let client = RawClient::new("exs.dyshek.org:50001", None).unwrap(); + let heights: Vec = vec![1, 4, 8, 12, 222, 6666, 12]; + let result_times = [ + 1231469665, 1231470988, 1231472743, 1231474888, 1231770653, 1236456633, 1231474888, + ]; + // Check ordering 10 times. This usually fails within 5 if ordering is incorrect. + for _ in 0..10 { + let results = client.batch_block_header(&heights).unwrap(); + for (index, result) in results.iter().enumerate() { + assert_eq!(result_times[index], result.time); + } + } + } + #[test] fn test_relay_fee() { let client = RawClient::new(get_test_server(), None).unwrap(); From a3488f4ff9007f0682f8e08cdedb6302498ac190 Mon Sep 17 00:00:00 2001 From: valued mammal Date: Fri, 28 Mar 2025 12:32:47 -0400 Subject: [PATCH 3/3] ci: bump actions/cache to v4 --- .github/workflows/cont_integration.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cont_integration.yml b/.github/workflows/cont_integration.yml index 93aae51..dc332ee 100644 --- a/.github/workflows/cont_integration.yml +++ b/.github/workflows/cont_integration.yml @@ -17,7 +17,7 @@ jobs: - name: Checkout uses: actions/checkout@v4 - name: Cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: | ~/.cargo/registry