Skip to content

Commit b86f2bb

Browse files
committed
errors if expecing headers notification but not subscribed
the opposite, erroring when subscribing multiple times, is not handled because clients could call multiple times to trigger the server to reply with the tip
1 parent d2dbab9 commit b86f2bb

File tree

2 files changed

+37
-8
lines changed

2 files changed

+37
-8
lines changed

src/raw_client.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ where
133133
last_id: AtomicUsize,
134134
waiting_map: Mutex<HashMap<usize, Sender<ChannelMessage>>>,
135135

136-
headers: Mutex<VecDeque<RawHeaderNotification>>,
136+
headers: Mutex<Option<VecDeque<RawHeaderNotification>>>,
137137
script_notifications: Mutex<HashMap<ScriptHash, VecDeque<ScriptStatus>>>,
138138

139139
#[cfg(feature = "debug-calls")]
@@ -154,7 +154,7 @@ where
154154
last_id: AtomicUsize::new(0),
155155
waiting_map: Mutex::new(HashMap::new()),
156156

157-
headers: Mutex::new(VecDeque::new()),
157+
headers: Mutex::new(None),
158158
script_notifications: Mutex::new(HashMap::new()),
159159

160160
#[cfg(feature = "debug-calls")]
@@ -648,11 +648,17 @@ impl<S: Read + Write> RawClient<S> {
648648

649649
fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> {
650650
match method {
651-
"blockchain.headers.subscribe" => self.headers.lock()?.append(
652-
&mut serde_json::from_value::<Vec<RawHeaderNotification>>(result)?
653-
.into_iter()
654-
.collect(),
655-
),
651+
"blockchain.headers.subscribe" => {
652+
let mut queue = self.headers.lock()?;
653+
match queue.as_mut() {
654+
None => return Err(Error::NotSubscribedToHeaders),
655+
Some(queue) => queue.append(
656+
&mut serde_json::from_value::<Vec<RawHeaderNotification>>(result)?
657+
.into_iter()
658+
.collect(),
659+
),
660+
}
661+
}
656662
"blockchain.scripthash.subscribe" => {
657663
let unserialized: ScriptNotification = serde_json::from_value(result)?;
658664
let mut script_notifications = self.script_notifications.lock()?;
@@ -762,6 +768,11 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
762768
}
763769

764770
fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
771+
let mut headers = self.headers.lock()?;
772+
if headers.is_none() {
773+
*headers = Some(VecDeque::new());
774+
}
775+
765776
let req = Request::new_id(
766777
self.last_id.fetch_add(1, Ordering::SeqCst),
767778
"blockchain.headers.subscribe",
@@ -773,7 +784,11 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
773784
}
774785

775786
fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
776-
Ok(self.headers.lock()?.pop_front())
787+
let mut queue = self.headers.lock()?;
788+
match queue.as_mut() {
789+
None => Err(Error::NotSubscribedToHeaders),
790+
Some(queue) => Ok(queue.pop_front()),
791+
}
777792
}
778793

779794
fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
@@ -1333,6 +1348,16 @@ mod test {
13331348
assert!(resp.height >= 639000);
13341349
}
13351350

1351+
#[test]
1352+
fn test_block_headers_subscribe_pop() {
1353+
let client = RawClient::new(get_test_server(), None).unwrap();
1354+
let resp = client.block_headers_pop();
1355+
assert_eq!(format!("{:?}", resp), "Err(NotSubscribedToHeaders)");
1356+
client.block_headers_subscribe().unwrap();
1357+
let resp = client.block_headers_pop();
1358+
assert!(resp.is_ok());
1359+
}
1360+
13361361
#[test]
13371362
fn test_script_subscribe() {
13381363
use std::str::FromStr;

src/types.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,9 @@ pub enum Error {
327327
#[cfg(feature = "use-openssl")]
328328
/// SSL Handshake failed with the server
329329
SslHandshakeError(openssl::ssl::HandshakeError<std::net::TcpStream>),
330+
331+
/// Expecting notification on headers but we are not subscribed
332+
NotSubscribedToHeaders,
330333
}
331334

332335
impl Display for Error {
@@ -364,6 +367,7 @@ impl Display for Error {
364367
Error::MissingDomain => f.write_str("Missing domain while it was explicitly asked to validate it"),
365368
Error::CouldntLockReader => f.write_str("Couldn't take a lock on the reader mutex. This means that there's already another reader thread is running"),
366369
Error::Mpsc => f.write_str("Broken IPC communication channel: the other thread probably has exited"),
370+
Error::NotSubscribedToHeaders => write!(f, "Expecting notification on headers but we are not subscribed"),
367371
}
368372
}
369373
}

0 commit comments

Comments
 (0)