Skip to content

Commit 5db5e32

Browse files
authored
[feature] #3355: Standardize block API (#3884)
Co-authored-by: Shanin Roman <shanin1000@yandex.ru> [fix] #3890: Fix validate topology on block sync Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
1 parent d111396 commit 5db5e32

File tree

72 files changed

+2220
-2604
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+2220
-2604
lines changed

Cargo.lock

Lines changed: 182 additions & 187 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/src/event.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use warp::ws::WebSocket;
1414
use crate::stream::{self, Sink, Stream};
1515

1616
/// Type of Stream error
17-
pub type StreamError = stream::Error<<WebSocket as Stream<VersionedEventSubscriptionRequest>>::Err>;
17+
pub type StreamError = stream::Error<<WebSocket as Stream<EventSubscriptionRequest>>::Err>;
1818

1919
/// Type of error for `Consumer`
2020
#[derive(thiserror::Error, Debug)]
@@ -57,9 +57,7 @@ impl Consumer {
5757
/// Can fail due to timeout or without message at websocket or during decoding request
5858
#[iroha_futures::telemetry_future]
5959
pub async fn new(mut stream: WebSocket) -> Result<Self> {
60-
let subscription_request: VersionedEventSubscriptionRequest = stream.recv().await?;
61-
let EventSubscriptionRequest(filter) = subscription_request.into_v1();
62-
60+
let EventSubscriptionRequest(filter) = stream.recv().await?;
6361
Ok(Consumer { stream, filter })
6462
}
6563

@@ -74,7 +72,7 @@ impl Consumer {
7472
}
7573

7674
self.stream
77-
.send(VersionedEventMessage::from(EventMessage(event)))
75+
.send(EventMessage(event))
7876
.await
7977
.map_err(Into::into)
8078
}

cli/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,9 @@ impl NetworkRelay {
160160

161161
match msg {
162162
SumeragiPacket(data) => {
163-
self.sumeragi.incoming_message(data.into_v1());
163+
self.sumeragi.incoming_message(*data);
164164
}
165-
BlockSync(data) => self.block_sync.message(data.into_v1()).await,
165+
BlockSync(data) => self.block_sync.message(*data).await,
166166
TransactionGossiper(data) => self.gossiper.gossip(*data).await,
167167
Health => {}
168168
}

cli/src/stream.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use core::{result::Result, time::Duration};
66

77
use futures::{SinkExt, StreamExt};
88
use iroha_version::prelude::*;
9+
use parity_scale_codec::DecodeAll;
910

1011
#[cfg(test)]
1112
const TIMEOUT: Duration = Duration::from_millis(10_000);
@@ -34,7 +35,7 @@ where
3435
/// Unexpected non-binary message received
3536
NonBinaryMessage,
3637
/// Error during versioned message decoding
37-
IrohaVersion(#[from] iroha_version::error::Error),
38+
Decode(#[from] parity_scale_codec::Error),
3839
}
3940

4041
/// Represents message used by the stream
@@ -56,7 +57,7 @@ pub trait StreamMessage {
5657
#[async_trait::async_trait]
5758
pub trait Sink<S>: SinkExt<Self::Message, Error = Self::Err> + Unpin
5859
where
59-
S: EncodeVersioned + Send + Sync + 'static,
60+
S: Encode + Send + Sync + 'static,
6061
{
6162
/// Error type returned by the sink
6263
type Err: std::error::Error + Send + Sync + 'static;
@@ -68,10 +69,7 @@ where
6869
async fn send(&mut self, message: S) -> Result<(), Error<Self::Err>> {
6970
tokio::time::timeout(
7071
TIMEOUT,
71-
<Self as SinkExt<Self::Message>>::send(
72-
self,
73-
Self::Message::binary(message.encode_versioned()),
74-
),
72+
<Self as SinkExt<Self::Message>>::send(self, Self::Message::binary(message.encode())),
7573
)
7674
.await
7775
.map_err(|_err| Error::SendTimeout)?
@@ -81,7 +79,7 @@ where
8179

8280
/// Trait for reading custom messages from stream
8381
#[async_trait::async_trait]
84-
pub trait Stream<R: DecodeVersioned>:
82+
pub trait Stream<R: DecodeAll>:
8583
StreamExt<Item = std::result::Result<Self::Message, Self::Err>> + Unpin
8684
{
8785
/// Error type returned by the stream
@@ -106,9 +104,7 @@ pub trait Stream<R: DecodeVersioned>:
106104
return Err(Error::NonBinaryMessage);
107105
}
108106

109-
Ok(R::decode_all_versioned(
110-
subscription_request_message.as_bytes(),
111-
)?)
107+
Ok(R::decode_all(&mut subscription_request_message.as_bytes())?)
112108
}
113109
}
114110

@@ -133,14 +129,14 @@ impl StreamMessage for warp::ws::Message {
133129
#[async_trait::async_trait]
134130
impl<M> Sink<M> for warp::ws::WebSocket
135131
where
136-
M: EncodeVersioned + Send + Sync + 'static,
132+
M: Encode + Send + Sync + 'static,
137133
{
138134
type Err = warp::Error;
139135
type Message = warp::ws::Message;
140136
}
141137

142138
#[async_trait::async_trait]
143-
impl<M: DecodeVersioned> Stream<M> for warp::ws::WebSocket {
139+
impl<M: DecodeAll> Stream<M> for warp::ws::WebSocket {
144140
type Err = warp::Error;
145141
type Message = warp::ws::Message;
146142
}
@@ -152,14 +148,14 @@ mod ws_client {
152148
use super::*;
153149

154150
#[async_trait::async_trait]
155-
impl<M: DecodeVersioned> Stream<M> for WsClient {
151+
impl<M: DecodeAll> Stream<M> for WsClient {
156152
type Err = warp::test::WsError;
157153
type Message = warp::ws::Message;
158154
}
159155
#[async_trait::async_trait]
160156
impl<M> Sink<M> for WsClient
161157
where
162-
M: EncodeVersioned + Send + Sync + 'static,
158+
M: Encode + Send + Sync + 'static,
163159
{
164160
type Err = warp::test::WsError;
165161
type Message = warp::ws::Message;

cli/src/torii/routing.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,8 @@ use iroha_core::{
2525
};
2626
use iroha_data_model::{
2727
block::{
28-
stream::{
29-
BlockMessage, BlockSubscriptionRequest, VersionedBlockMessage,
30-
VersionedBlockSubscriptionRequest,
31-
},
32-
VersionedCommittedBlock,
28+
stream::{BlockMessage, BlockSubscriptionRequest},
29+
VersionedSignedBlock,
3330
},
3431
http::{BatchedResponse, VersionedBatchedResponse},
3532
prelude::*,
@@ -278,8 +275,7 @@ async fn handle_post_configuration(
278275

279276
#[iroha_futures::telemetry_future]
280277
async fn handle_blocks_stream(kura: Arc<Kura>, mut stream: WebSocket) -> eyre::Result<()> {
281-
let subscription_request: VersionedBlockSubscriptionRequest = stream.recv().await?;
282-
let BlockSubscriptionRequest(mut from_height) = subscription_request.into_v1();
278+
let BlockSubscriptionRequest(mut from_height) = stream.recv().await?;
283279

284280
let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
285281
loop {
@@ -307,10 +303,8 @@ async fn handle_blocks_stream(kura: Arc<Kura>, mut stream: WebSocket) -> eyre::R
307303
_ = interval.tick() => {
308304
if let Some(block) = kura.get_block_by_height(from_height.get()) {
309305
stream
310-
// TODO: to avoid clone `VersionedBlockMessage` could be split into sending and receiving parts
311-
.send(VersionedBlockMessage::from(
312-
BlockMessage(VersionedCommittedBlock::clone(&block)),
313-
))
306+
// TODO: to avoid clone `BlockMessage` could be split into sending and receiving parts
307+
.send(BlockMessage(VersionedSignedBlock::clone(&block)))
314308
.await?;
315309
from_height = from_height.checked_add(1).expect("Maximum block height is achieved.");
316310
}

client/benches/tps/utils.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,15 @@ impl Config {
132132
let block = blocks
133133
.next()
134134
.expect("The block is not yet in WSV. Need more sleep?");
135-
let block = block.as_v1();
136135
(
137136
block
137+
.payload()
138138
.transactions
139139
.iter()
140140
.filter(|tx| tx.error.is_none())
141141
.count(),
142142
block
143+
.payload()
143144
.transactions
144145
.iter()
145146
.filter(|tx| tx.error.is_some())
@@ -173,7 +174,7 @@ impl MeasurerUnit {
173174
let keypair = iroha_crypto::KeyPair::generate().expect("Failed to generate KeyPair.");
174175

175176
let account_id = account_id(self.name);
176-
let alice_id = <Account as Identifiable>::Id::from_str("alice@wonderland")?;
177+
let alice_id = AccountId::from_str("alice@wonderland")?;
177178
let asset_id = asset_id(self.name);
178179

179180
let register_me = RegisterBox::new(Account::new(
@@ -238,8 +239,7 @@ impl MeasurerUnit {
238239
let submitter = self.client.clone();
239240
let interval_us_per_tx = self.config.interval_us_per_tx;
240241
let instructions = self.instructions();
241-
let alice_id = <Account as Identifiable>::Id::from_str("alice@wonderland")
242-
.expect("Failed to parse account id");
242+
let alice_id = AccountId::from_str("alice@wonderland").expect("Failed to parse account id");
243243

244244
let mut nonce = NonZeroU32::new(1).expect("Valid");
245245

client/src/client.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use http_default::{AsyncWebSocketStream, WebSocketStream};
2121
use iroha_config::{client::Configuration, torii::uri, GetConfiguration, PostConfiguration};
2222
use iroha_crypto::{HashOf, KeyPair};
2323
use iroha_data_model::{
24-
block::VersionedCommittedBlock,
24+
block::VersionedSignedBlock,
2525
http::VersionedBatchedResponse,
2626
isi::Instruction,
2727
predicate::PredicateBox,
@@ -654,7 +654,7 @@ impl Client {
654654
PipelineStatus::Rejected(ref reason) => {
655655
return Err(reason.clone().into());
656656
}
657-
PipelineStatus::Committed => return Ok(hash.transmute()),
657+
PipelineStatus::Committed => return Ok(hash),
658658
}
659659
}
660660
}
@@ -1036,7 +1036,7 @@ impl Client {
10361036
pub fn listen_for_blocks(
10371037
&self,
10381038
height: NonZeroU64,
1039-
) -> Result<impl Iterator<Item = Result<VersionedCommittedBlock>>> {
1039+
) -> Result<impl Iterator<Item = Result<VersionedSignedBlock>>> {
10401040
blocks_api::BlockIterator::new(self.blocks_handler(height)?)
10411041
}
10421042

@@ -1448,10 +1448,7 @@ pub mod events_api {
14481448
url,
14491449
} = self;
14501450

1451-
let msg =
1452-
VersionedEventSubscriptionRequest::from(EventSubscriptionRequest::new(filter))
1453-
.encode_versioned();
1454-
1451+
let msg = EventSubscriptionRequest::new(filter).encode();
14551452
InitData::new(R::new(HttpMethod::GET, url).headers(headers), msg, Events)
14561453
}
14571454
}
@@ -1464,8 +1461,7 @@ pub mod events_api {
14641461
type Event = iroha_data_model::prelude::Event;
14651462

14661463
fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
1467-
let event_socket_message =
1468-
VersionedEventMessage::decode_all_versioned(&message)?.into_v1();
1464+
let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?;
14691465
Ok(event_socket_message.into())
14701466
}
14711467
}
@@ -1532,10 +1528,7 @@ mod blocks_api {
15321528
url,
15331529
} = self;
15341530

1535-
let msg =
1536-
VersionedBlockSubscriptionRequest::from(BlockSubscriptionRequest::new(height))
1537-
.encode_versioned();
1538-
1531+
let msg = BlockSubscriptionRequest::new(height).encode();
15391532
InitData::new(R::new(HttpMethod::GET, url).headers(headers), msg, Events)
15401533
}
15411534
}
@@ -1545,11 +1538,10 @@ mod blocks_api {
15451538
pub struct Events;
15461539

15471540
impl FlowEvents for Events {
1548-
type Event = iroha_data_model::block::VersionedCommittedBlock;
1541+
type Event = iroha_data_model::block::VersionedSignedBlock;
15491542

15501543
fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
1551-
let block_msg = VersionedBlockMessage::decode_all_versioned(&message)?.into_v1();
1552-
Ok(block_msg.into())
1544+
Ok(BlockMessage::decode_all(&mut message.as_slice()).map(Into::into)?)
15531545
}
15541546
}
15551547
}
@@ -1610,7 +1602,7 @@ pub mod asset {
16101602
}
16111603

16121604
/// Construct a query to get an asset by its id
1613-
pub fn by_id(asset_id: impl Into<EvaluatesTo<<Asset as Identifiable>::Id>>) -> FindAssetById {
1605+
pub fn by_id(asset_id: impl Into<EvaluatesTo<AssetId>>) -> FindAssetById {
16141606
FindAssetById::new(asset_id)
16151607
}
16161608
}
@@ -1632,7 +1624,7 @@ pub mod block {
16321624

16331625
/// Construct a query to find block header by hash
16341626
pub fn header_by_hash(
1635-
hash: impl Into<EvaluatesTo<HashOf<VersionedCommittedBlock>>>,
1627+
hash: impl Into<EvaluatesTo<HashOf<VersionedSignedBlock>>>,
16361628
) -> FindBlockHeaderByHash {
16371629
FindBlockHeaderByHash::new(hash)
16381630
}

client/tests/integration/asset.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use super::Configuration;
1414

1515
#[test]
1616
fn client_register_asset_should_add_asset_once_but_not_twice() -> Result<()> {
17-
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_620).start_with_runtime();
17+
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_620).start_with_runtime();
1818
wait_for_genesis_committed(&[test_client.clone()], 0);
1919

2020
// Given
@@ -48,7 +48,7 @@ fn client_register_asset_should_add_asset_once_but_not_twice() -> Result<()> {
4848

4949
#[test]
5050
fn unregister_asset_should_remove_asset_from_account() -> Result<()> {
51-
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_555).start_with_runtime();
51+
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_555).start_with_runtime();
5252
wait_for_genesis_committed(&[test_client.clone()], 0);
5353

5454
// Given
@@ -87,7 +87,7 @@ fn unregister_asset_should_remove_asset_from_account() -> Result<()> {
8787

8888
#[test]
8989
fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> {
90-
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_000).start_with_runtime();
90+
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_000).start_with_runtime();
9191
wait_for_genesis_committed(&[test_client.clone()], 0);
9292

9393
// Given
@@ -120,7 +120,7 @@ fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() ->
120120

121121
#[test]
122122
fn client_add_big_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> {
123-
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_510).start_with_runtime();
123+
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_510).start_with_runtime();
124124
wait_for_genesis_committed(&[test_client.clone()], 0);
125125

126126
// Given
@@ -153,7 +153,7 @@ fn client_add_big_asset_quantity_to_existing_asset_should_increase_asset_amount(
153153

154154
#[test]
155155
fn client_add_asset_with_decimal_should_increase_asset_amount() -> Result<()> {
156-
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_515).start_with_runtime();
156+
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_515).start_with_runtime();
157157
wait_for_genesis_committed(&[test_client.clone()], 0);
158158

159159
// Given

0 commit comments

Comments
 (0)