Skip to content

Commit 9d3f13b

Browse files
add client cleanup
1 parent c30246e commit 9d3f13b

File tree

1 file changed

+24
-29
lines changed

1 file changed

+24
-29
lines changed

client/src/consensus.rs

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use alto_types::{
33
Block, Finalization, Finalized, Kind, Notarization, Notarized, Nullification, Seed,
44
};
55
use futures::{channel::mpsc::unbounded, Stream, StreamExt};
6-
use tokio_tungstenite::{connect_async, tungstenite::Message};
6+
use tokio_tungstenite::{connect_async, tungstenite::Message as TMessage};
77

88
fn seed_upload_path(base: String) -> String {
99
format!("{}/seed", base)
@@ -37,28 +37,28 @@ fn finalization_get_path(base: String, query: &IndexQuery) -> String {
3737
format!("{}/finalization/{}", base, query.serialize())
3838
}
3939

40-
fn consensus_register_path(base: String) -> String {
40+
/// There is no block upload path. Blocks are uploaded as a byproduct of notarization
41+
/// and finalization uploads.
42+
fn block_get_path(base: String, query: &Query) -> String {
43+
format!("{}/block/{}", base, query.serialize())
44+
}
45+
46+
fn register_path(base: String) -> String {
4147
format!("{}/consensus/ws", base)
4248
}
4349

44-
pub enum BlockPayload {
50+
pub enum Payload {
4551
Finalized(Box<Finalized>),
4652
Block(Block),
4753
}
4854

49-
pub enum ConsensusMessage {
55+
pub enum Message {
5056
Seed(Seed),
5157
Nullification(Nullification),
5258
Notarization(Notarized),
5359
Finalization(Finalized),
5460
}
5561

56-
/// There is no block upload path. Blocks are uploaded as a byproduct of notarization
57-
/// and finalization uploads.
58-
fn block_get_path(base: String, query: &Query) -> String {
59-
format!("{}/block/{}", base, query.serialize())
60-
}
61-
6262
impl Client {
6363
pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
6464
let request = seed.serialize();
@@ -235,7 +235,7 @@ impl Client {
235235
Ok(result)
236236
}
237237

238-
pub async fn block_get(&self, query: Query) -> Result<BlockPayload, Error> {
238+
pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
239239
// Get the block
240240
let client = reqwest::Client::new();
241241
let result = client
@@ -253,32 +253,30 @@ impl Client {
253253
Query::Latest => {
254254
let result =
255255
Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
256-
BlockPayload::Finalized(Box::new(result))
256+
Payload::Finalized(Box::new(result))
257257
}
258258
Query::Index(index) => {
259259
let result =
260260
Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
261261
if result.block.height != index {
262262
return Err(Error::InvalidData);
263263
}
264-
BlockPayload::Finalized(Box::new(result))
264+
Payload::Finalized(Box::new(result))
265265
}
266266
Query::Digest(digest) => {
267267
let result = Block::deserialize(&bytes).ok_or(Error::InvalidData)?;
268268
if result.digest() != digest {
269269
return Err(Error::InvalidData);
270270
}
271-
BlockPayload::Block(result)
271+
Payload::Block(result)
272272
}
273273
};
274274
Ok(result)
275275
}
276276

277-
pub async fn consensus_register(
278-
&self,
279-
) -> Result<impl Stream<Item = Result<ConsensusMessage, Error>>, Error> {
277+
pub async fn register(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
280278
// Connect to the websocket endpoint
281-
let (stream, _) = connect_async(consensus_register_path(self.ws_uri.clone()))
279+
let (stream, _) = connect_async(register_path(self.ws_uri.clone()))
282280
.await
283281
.map_err(Error::from)?;
284282
let (_, read) = stream.split();
@@ -289,7 +287,7 @@ impl Client {
289287
tokio::spawn(async move {
290288
read.for_each(|message| async {
291289
match message {
292-
Ok(Message::Binary(data)) => {
290+
Ok(TMessage::Binary(data)) => {
293291
// Get kind
294292
let kind = data[0];
295293
let Some(kind) = Kind::from_u8(kind) else {
@@ -302,16 +300,15 @@ impl Client {
302300
match kind {
303301
Kind::Seed => {
304302
if let Some(seed) = Seed::deserialize(Some(&public), data) {
305-
let _ = sender.unbounded_send(Ok(ConsensusMessage::Seed(seed)));
303+
let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
306304
} else {
307305
let _ = sender.unbounded_send(Err(Error::InvalidData));
308306
}
309307
}
310308
Kind::Notarization => {
311309
if let Some(payload) = Notarized::deserialize(Some(&public), data) {
312-
let _ = sender.unbounded_send(Ok(
313-
ConsensusMessage::Notarization(payload),
314-
));
310+
let _ =
311+
sender.unbounded_send(Ok(Message::Notarization(payload)));
315312
} else {
316313
let _ = sender.unbounded_send(Err(Error::InvalidData));
317314
}
@@ -320,18 +317,16 @@ impl Client {
320317
if let Some(nullification) =
321318
Nullification::deserialize(Some(&public), data)
322319
{
323-
let _ = sender.unbounded_send(Ok(
324-
ConsensusMessage::Nullification(nullification),
325-
));
320+
let _ = sender
321+
.unbounded_send(Ok(Message::Nullification(nullification)));
326322
} else {
327323
let _ = sender.unbounded_send(Err(Error::InvalidData));
328324
}
329325
}
330326
Kind::Finalization => {
331327
if let Some(payload) = Finalized::deserialize(Some(&public), data) {
332-
let _ = sender.unbounded_send(Ok(
333-
ConsensusMessage::Finalization(payload),
334-
));
328+
let _ =
329+
sender.unbounded_send(Ok(Message::Finalization(payload)));
335330
} else {
336331
let _ = sender.unbounded_send(Err(Error::InvalidData));
337332
}

0 commit comments

Comments
 (0)