Skip to content

Commit 745d2be

Browse files
authored
Merge pull request #222 from abel-von/fix-reliability
Fix the stream sending reliability
2 parents 152ac12 + 3355f7d commit 745d2be

File tree

4 files changed

+56
-16
lines changed

4 files changed

+56
-16
lines changed

src/asynchronous/client.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use crate::r#async::stream::{
2727
};
2828
use crate::r#async::utils;
2929

30+
use super::stream::SendingMessage;
31+
3032
/// A ttrpc Client (async).
3133
#[derive(Clone)]
3234
pub struct Client {
@@ -78,7 +80,7 @@ impl Client {
7880
self.streams.lock().unwrap().insert(stream_id, tx);
7981

8082
self.req_tx
81-
.send(msg)
83+
.send(SendingMessage::new(msg))
8284
.await
8385
.map_err(|e| Error::Others(format!("Send packet to sender error {e:?}")))?;
8486

@@ -139,7 +141,7 @@ impl Client {
139141
// TODO: check return
140142
self.streams.lock().unwrap().insert(stream_id, tx);
141143
self.req_tx
142-
.send(msg)
144+
.send(SendingMessage::new(msg))
143145
.await
144146
.map_err(|e| Error::Others(format!("Send packet to sender error {e:?}")))?;
145147

@@ -204,7 +206,7 @@ struct ClientWriter {
204206

205207
#[async_trait]
206208
impl WriterDelegate for ClientWriter {
207-
async fn recv(&mut self) -> Option<GenMessage> {
209+
async fn recv(&mut self) -> Option<SendingMessage> {
208210
self.rx.recv().await
209211
}
210212

src/asynchronous/connection.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use tokio::{
1616
use crate::error::Error;
1717
use crate::proto::{GenMessage, GenMessageError, MessageHeader};
1818

19+
use super::stream::SendingMessage;
20+
1921
pub trait Builder {
2022
type Reader;
2123
type Writer;
@@ -25,7 +27,7 @@ pub trait Builder {
2527

2628
#[async_trait]
2729
pub trait WriterDelegate {
28-
async fn recv(&mut self) -> Option<GenMessage>;
30+
async fn recv(&mut self) -> Option<SendingMessage>;
2931
async fn disconnect(&self, msg: &GenMessage, e: Error);
3032
async fn exit(&self);
3133
}
@@ -58,12 +60,14 @@ where
5860
let (reader_delegate, mut writer_delegate) = builder.build();
5961

6062
let writer_task = tokio::spawn(async move {
61-
while let Some(msg) = writer_delegate.recv().await {
62-
trace!("write message: {:?}", msg);
63-
if let Err(e) = msg.write_to(&mut writer).await {
63+
while let Some(mut sending_msg) = writer_delegate.recv().await {
64+
trace!("write message: {:?}", sending_msg.msg);
65+
if let Err(e) = sending_msg.msg.write_to(&mut writer).await {
6466
error!("write_message got error: {:?}", e);
65-
writer_delegate.disconnect(&msg, e).await;
67+
sending_msg.send_result(Err(e.clone()));
68+
writer_delegate.disconnect(&sending_msg.msg, e).await;
6669
}
70+
sending_msg.send_result(Ok(()));
6771
}
6872
writer_delegate.exit().await;
6973
trace!("Writer task exit.");

src/asynchronous/server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use tokio::{
3131
#[cfg(any(target_os = "linux", target_os = "android"))]
3232
use tokio_vsock::VsockListener;
3333

34-
use crate::asynchronous::unix_incoming::UnixIncoming;
34+
use crate::asynchronous::{stream::SendingMessage, unix_incoming::UnixIncoming};
3535
use crate::common::{self, Domain};
3636
use crate::context;
3737
use crate::error::{get_status, Error, Result};
@@ -339,7 +339,7 @@ struct ServerWriter {
339339

340340
#[async_trait]
341341
impl WriterDelegate for ServerWriter {
342-
async fn recv(&mut self) -> Option<GenMessage> {
342+
async fn recv(&mut self) -> Option<SendingMessage> {
343343
self.rx.recv().await
344344
}
345345
async fn disconnect(&self, _msg: &GenMessage, _: Error) {}
@@ -462,7 +462,7 @@ impl HandlerContext {
462462
};
463463

464464
self.tx
465-
.send(msg)
465+
.send(SendingMessage::new(msg))
466466
.await
467467
.map_err(err_to_others_err!(e, "Send packet to sender error "))
468468
.ok();
@@ -652,7 +652,7 @@ impl HandlerContext {
652652
header: MessageHeader::new_response(stream_id, payload.len() as u32),
653653
payload,
654654
};
655-
tx.send(msg)
655+
tx.send(SendingMessage::new(msg))
656656
.await
657657
.map_err(err_to_others_err!(e, "Send packet to sender error "))
658658
}

src/asynchronous/stream.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,42 @@ use crate::proto::{
1717
MESSAGE_TYPE_DATA, MESSAGE_TYPE_RESPONSE,
1818
};
1919

20-
pub type MessageSender = mpsc::Sender<GenMessage>;
21-
pub type MessageReceiver = mpsc::Receiver<GenMessage>;
20+
pub type MessageSender = mpsc::Sender<SendingMessage>;
21+
pub type MessageReceiver = mpsc::Receiver<SendingMessage>;
2222

2323
pub type ResultSender = mpsc::Sender<Result<GenMessage>>;
2424
pub type ResultReceiver = mpsc::Receiver<Result<GenMessage>>;
2525

26+
#[derive(Debug)]
27+
pub struct SendingMessage {
28+
pub msg: GenMessage,
29+
pub result_chan: Option<tokio::sync::oneshot::Sender<Result<()>>>,
30+
}
31+
32+
impl SendingMessage {
33+
pub fn new(msg: GenMessage) -> Self {
34+
Self {
35+
msg,
36+
result_chan: None,
37+
}
38+
}
39+
pub fn new_with_result(
40+
msg: GenMessage,
41+
result_chan: tokio::sync::oneshot::Sender<Result<()>>,
42+
) -> Self {
43+
Self {
44+
msg,
45+
result_chan: Some(result_chan),
46+
}
47+
}
48+
49+
pub fn send_result(&mut self, result: Result<()>) {
50+
if let Some(result_ch) = self.result_chan.take() {
51+
result_ch.send(result).unwrap_or_default();
52+
}
53+
}
54+
}
55+
2656
#[derive(Debug)]
2757
pub struct ClientStream<Q, P> {
2858
tx: CSSender<Q>,
@@ -317,9 +347,13 @@ async fn _recv(rx: &mut ResultReceiver) -> Result<GenMessage> {
317347
}
318348

319349
async fn _send(tx: &MessageSender, msg: GenMessage) -> Result<()> {
320-
tx.send(msg)
350+
let (res_tx, res_rx) = tokio::sync::oneshot::channel();
351+
tx.send(SendingMessage::new_with_result(msg, res_tx))
352+
.await
353+
.map_err(|e| Error::Others(format!("Send data packet to sender error {:?}", e)))?;
354+
res_rx
321355
.await
322-
.map_err(|e| Error::Others(format!("Send data packet to sender error {e:?}")))
356+
.map_err(|e| Error::Others(format!("Failed to wait send result {:?}", e)))?
323357
}
324358

325359
#[derive(Clone, Copy, Debug, PartialEq, Eq)]

0 commit comments

Comments
 (0)