Skip to content

Commit 314ec95

Browse files
committed
async: add streaming support for the base component
Added protocol support for streaming. Added `StreamInner` struct for streaming operations and Streaming related errors. Signed-off-by: wllenyj <wllenyj@linux.alibaba.com>
1 parent 4cafce1 commit 314ec95

File tree

3 files changed

+211
-3
lines changed

3 files changed

+211
-3
lines changed

src/asynchronous/stream.rs

Lines changed: 182 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,193 @@
33
// SPDX-License-Identifier: Apache-2.0
44
//
55

6+
use std::collections::HashMap;
7+
use std::sync::atomic::{AtomicBool, Ordering};
8+
use std::sync::{Arc, Mutex};
9+
610
use tokio::sync::mpsc;
711

8-
use crate::error::Result;
9-
use crate::proto::GenMessage;
12+
use crate::error::{Error, Result};
13+
use crate::proto::{
14+
Code, Codec, GenMessage, MessageHeader, Response, FLAG_NO_DATA, FLAG_REMOTE_CLOSED,
15+
MESSAGE_TYPE_DATA, MESSAGE_TYPE_RESPONSE,
16+
};
1017

1118
pub type MessageSender = mpsc::Sender<GenMessage>;
1219
pub type MessageReceiver = mpsc::Receiver<GenMessage>;
1320

1421
pub type ResultSender = mpsc::Sender<Result<GenMessage>>;
1522
pub type ResultReceiver = mpsc::Receiver<Result<GenMessage>>;
23+
24+
async fn _recv(rx: &mut ResultReceiver) -> Result<GenMessage> {
25+
rx.recv()
26+
.await
27+
.unwrap_or_else(|| Err(Error::Others("Receive packet from recver error".to_string())))
28+
}
29+
30+
async fn _send(tx: &MessageSender, msg: GenMessage) -> Result<()> {
31+
tx.send(msg)
32+
.await
33+
.map_err(|e| Error::Others(format!("Send data packet to sender error {:?}", e)))
34+
}
35+
36+
#[derive(Clone, Copy, Debug, PartialEq)]
37+
pub enum Kind {
38+
Client,
39+
Server,
40+
}
41+
42+
#[derive(Debug)]
43+
pub struct StreamInner {
44+
sender: StreamSender,
45+
receiver: StreamReceiver,
46+
}
47+
48+
impl StreamInner {
49+
pub fn new(
50+
stream_id: u32,
51+
tx: MessageSender,
52+
rx: ResultReceiver,
53+
//waiter: shutdown::Waiter,
54+
sendable: bool,
55+
recveivable: bool,
56+
kind: Kind,
57+
streams: Arc<Mutex<HashMap<u32, ResultSender>>>,
58+
) -> Self {
59+
Self {
60+
sender: StreamSender {
61+
tx,
62+
stream_id,
63+
sendable,
64+
local_closed: Arc::new(AtomicBool::new(false)),
65+
kind,
66+
},
67+
receiver: StreamReceiver {
68+
rx,
69+
stream_id,
70+
recveivable,
71+
remote_closed: false,
72+
kind,
73+
streams,
74+
},
75+
}
76+
}
77+
78+
fn split(self) -> (StreamSender, StreamReceiver) {
79+
(self.sender, self.receiver)
80+
}
81+
82+
pub async fn send(&self, buf: Vec<u8>) -> Result<()> {
83+
self.sender.send(buf).await
84+
}
85+
86+
pub async fn close_send(&self) -> Result<()> {
87+
self.sender.close_send().await
88+
}
89+
90+
pub async fn recv(&mut self) -> Result<Vec<u8>> {
91+
self.receiver.recv().await
92+
}
93+
}
94+
95+
#[derive(Clone, Debug)]
96+
pub struct StreamSender {
97+
tx: MessageSender,
98+
stream_id: u32,
99+
sendable: bool,
100+
local_closed: Arc<AtomicBool>,
101+
kind: Kind,
102+
}
103+
104+
#[derive(Debug)]
105+
pub struct StreamReceiver {
106+
rx: ResultReceiver,
107+
stream_id: u32,
108+
recveivable: bool,
109+
remote_closed: bool,
110+
kind: Kind,
111+
streams: Arc<Mutex<HashMap<u32, ResultSender>>>,
112+
}
113+
114+
impl Drop for StreamReceiver {
115+
fn drop(&mut self) {
116+
self.streams.lock().unwrap().remove(&self.stream_id);
117+
}
118+
}
119+
120+
impl StreamSender {
121+
pub async fn send(&self, buf: Vec<u8>) -> Result<()> {
122+
debug_assert!(self.sendable);
123+
if self.local_closed.load(Ordering::Relaxed) {
124+
debug_assert_eq!(self.kind, Kind::Client);
125+
return Err(Error::LocalClosed);
126+
}
127+
let header = MessageHeader::new_data(self.stream_id, buf.len() as u32);
128+
let msg = GenMessage {
129+
header,
130+
payload: buf,
131+
};
132+
_send(&self.tx, msg).await?;
133+
134+
Ok(())
135+
}
136+
137+
pub async fn close_send(&self) -> Result<()> {
138+
debug_assert_eq!(self.kind, Kind::Client);
139+
debug_assert!(self.sendable);
140+
if self.local_closed.load(Ordering::Relaxed) {
141+
return Err(Error::LocalClosed);
142+
}
143+
let mut header = MessageHeader::new_data(self.stream_id, 0);
144+
header.set_flags(FLAG_REMOTE_CLOSED | FLAG_NO_DATA);
145+
let msg = GenMessage {
146+
header,
147+
payload: Vec::new(),
148+
};
149+
_send(&self.tx, msg).await?;
150+
self.local_closed.store(true, Ordering::Relaxed);
151+
Ok(())
152+
}
153+
}
154+
155+
impl StreamReceiver {
156+
pub async fn recv(&mut self) -> Result<Vec<u8>> {
157+
if self.remote_closed {
158+
return Err(Error::RemoteClosed);
159+
}
160+
let msg = _recv(&mut self.rx).await?;
161+
let payload = match msg.header.type_ {
162+
MESSAGE_TYPE_RESPONSE => {
163+
debug_assert_eq!(self.kind, Kind::Client);
164+
self.remote_closed = true;
165+
let resp = Response::decode(&msg.payload)
166+
.map_err(err_to_others_err!(e, "Decode message failed."))?;
167+
if let Some(status) = resp.status.as_ref() {
168+
if status.get_code() != Code::OK {
169+
return Err(Error::RpcStatus((*status).clone()));
170+
}
171+
}
172+
resp.payload
173+
}
174+
MESSAGE_TYPE_DATA => {
175+
if !self.recveivable {
176+
self.remote_closed = true;
177+
return Err(Error::Others(
178+
"received data from non-streaming server.".to_string(),
179+
));
180+
}
181+
if (msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED {
182+
self.remote_closed = true;
183+
if (msg.header.flags & FLAG_NO_DATA) == FLAG_NO_DATA {
184+
return Err(Error::Eof);
185+
}
186+
}
187+
msg.payload
188+
}
189+
_ => {
190+
return Err(Error::Others("not support".to_string()));
191+
}
192+
};
193+
Ok(payload)
194+
}
195+
}

src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ pub enum Error {
3030
#[error("Nix error: {0}")]
3131
Nix(#[from] nix::Error),
3232

33+
#[error("ttrpc err: local stream closed")]
34+
LocalClosed,
35+
36+
#[error("ttrpc err: remote stream closed")]
37+
RemoteClosed,
38+
39+
#[error("eof")]
40+
Eof,
41+
3342
#[error("ttrpc err: {0}")]
3443
Others(String),
3544
}

src/proto.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ pub const MESSAGE_LENGTH_MAX: usize = 4 << 20;
2121

2222
pub const MESSAGE_TYPE_REQUEST: u8 = 0x1;
2323
pub const MESSAGE_TYPE_RESPONSE: u8 = 0x2;
24+
pub const MESSAGE_TYPE_DATA: u8 = 0x3;
25+
26+
pub const FLAG_REMOTE_CLOSED: u8 = 0x1;
27+
pub const FLAG_REMOTE_OPEN: u8 = 0x2;
28+
pub const FLAG_NO_DATA: u8 = 0x4;
2429

2530
/// Message header of ttrpc.
2631
#[derive(Default, Debug, Clone, Copy, PartialEq)]
@@ -57,6 +62,7 @@ impl From<MessageHeader> for Vec<u8> {
5762

5863
impl MessageHeader {
5964
/// Creates a request MessageHeader from stream_id and len.
65+
///
6066
/// Use the default message type MESSAGE_TYPE_REQUEST, and default flags 0.
6167
pub fn new_request(stream_id: u32, len: u32) -> Self {
6268
Self {
@@ -68,7 +74,8 @@ impl MessageHeader {
6874
}
6975

7076
/// Creates a response MessageHeader from stream_id and len.
71-
/// Use the default message type MESSAGE_TYPE_RESPONSE, and default flags 0.
77+
///
78+
/// Use the MESSAGE_TYPE_RESPONSE message type, and default flags 0.
7279
pub fn new_response(stream_id: u32, len: u32) -> Self {
7380
Self {
7481
length: len,
@@ -78,6 +85,18 @@ impl MessageHeader {
7885
}
7986
}
8087

88+
/// Creates a data MessageHeader from stream_id and len.
89+
///
90+
/// Use the MESSAGE_TYPE_DATA message type, and default flags 0.
91+
pub fn new_data(stream_id: u32, len: u32) -> Self {
92+
Self {
93+
length: len,
94+
stream_id,
95+
type_: MESSAGE_TYPE_DATA,
96+
flags: 0,
97+
}
98+
}
99+
81100
/// Set the stream_id of message using the given value.
82101
pub fn set_stream_id(&mut self, stream_id: u32) {
83102
self.stream_id = stream_id;

0 commit comments

Comments
 (0)