Skip to content

Commit 3355f7d

Browse files
committed
add channel to get send result
Currently the send() method of stream implemented by send the value to an unbounded channel, so even the connection is closed for a long time, the send function still return succeed. This commit adds a channel to the message so that we can wait until the message is truely written to the connection. Signed-off-by: Abel Feng <fshb1988@gmail.com>
1 parent 152ac12 commit 3355f7d

File tree

4 files changed

+56
-16
lines changed

4 files changed

+56
-16
lines changed

src/asynchronous/client.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use crate::r#async::stream::{
2727
};
2828
use crate::r#async::utils;
2929

30+
use super::stream::SendingMessage;
31+
3032
/// A ttrpc Client (async).
3133
#[derive(Clone)]
3234
pub struct Client {
@@ -78,7 +80,7 @@ impl Client {
7880
self.streams.lock().unwrap().insert(stream_id, tx);
7981

8082
self.req_tx
81-
.send(msg)
83+
.send(SendingMessage::new(msg))
8284
.await
8385
.map_err(|e| Error::Others(format!("Send packet to sender error {e:?}")))?;
8486

@@ -139,7 +141,7 @@ impl Client {
139141
// TODO: check return
140142
self.streams.lock().unwrap().insert(stream_id, tx);
141143
self.req_tx
142-
.send(msg)
144+
.send(SendingMessage::new(msg))
143145
.await
144146
.map_err(|e| Error::Others(format!("Send packet to sender error {e:?}")))?;
145147

@@ -204,7 +206,7 @@ struct ClientWriter {
204206

205207
#[async_trait]
206208
impl WriterDelegate for ClientWriter {
207-
async fn recv(&mut self) -> Option<GenMessage> {
209+
async fn recv(&mut self) -> Option<SendingMessage> {
208210
self.rx.recv().await
209211
}
210212

src/asynchronous/connection.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use tokio::{
1616
use crate::error::Error;
1717
use crate::proto::{GenMessage, GenMessageError, MessageHeader};
1818

19+
use super::stream::SendingMessage;
20+
1921
pub trait Builder {
2022
type Reader;
2123
type Writer;
@@ -25,7 +27,7 @@ pub trait Builder {
2527

2628
#[async_trait]
2729
pub trait WriterDelegate {
28-
async fn recv(&mut self) -> Option<GenMessage>;
30+
async fn recv(&mut self) -> Option<SendingMessage>;
2931
async fn disconnect(&self, msg: &GenMessage, e: Error);
3032
async fn exit(&self);
3133
}
@@ -58,12 +60,14 @@ where
5860
let (reader_delegate, mut writer_delegate) = builder.build();
5961

6062
let writer_task = tokio::spawn(async move {
61-
while let Some(msg) = writer_delegate.recv().await {
62-
trace!("write message: {:?}", msg);
63-
if let Err(e) = msg.write_to(&mut writer).await {
63+
while let Some(mut sending_msg) = writer_delegate.recv().await {
64+
trace!("write message: {:?}", sending_msg.msg);
65+
if let Err(e) = sending_msg.msg.write_to(&mut writer).await {
6466
error!("write_message got error: {:?}", e);
65-
writer_delegate.disconnect(&msg, e).await;
67+
sending_msg.send_result(Err(e.clone()));
68+
writer_delegate.disconnect(&sending_msg.msg, e).await;
6669
}
70+
sending_msg.send_result(Ok(()));
6771
}
6872
writer_delegate.exit().await;
6973
trace!("Writer task exit.");

src/asynchronous/server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use tokio::{
3131
#[cfg(any(target_os = "linux", target_os = "android"))]
3232
use tokio_vsock::VsockListener;
3333

34-
use crate::asynchronous::unix_incoming::UnixIncoming;
34+
use crate::asynchronous::{stream::SendingMessage, unix_incoming::UnixIncoming};
3535
use crate::common::{self, Domain};
3636
use crate::context;
3737
use crate::error::{get_status, Error, Result};
@@ -339,7 +339,7 @@ struct ServerWriter {
339339

340340
#[async_trait]
341341
impl WriterDelegate for ServerWriter {
342-
async fn recv(&mut self) -> Option<GenMessage> {
342+
async fn recv(&mut self) -> Option<SendingMessage> {
343343
self.rx.recv().await
344344
}
345345
async fn disconnect(&self, _msg: &GenMessage, _: Error) {}
@@ -462,7 +462,7 @@ impl HandlerContext {
462462
};
463463

464464
self.tx
465-
.send(msg)
465+
.send(SendingMessage::new(msg))
466466
.await
467467
.map_err(err_to_others_err!(e, "Send packet to sender error "))
468468
.ok();
@@ -652,7 +652,7 @@ impl HandlerContext {
652652
header: MessageHeader::new_response(stream_id, payload.len() as u32),
653653
payload,
654654
};
655-
tx.send(msg)
655+
tx.send(SendingMessage::new(msg))
656656
.await
657657
.map_err(err_to_others_err!(e, "Send packet to sender error "))
658658
}

src/asynchronous/stream.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,42 @@ use crate::proto::{
1717
MESSAGE_TYPE_DATA, MESSAGE_TYPE_RESPONSE,
1818
};
1919

20-
pub type MessageSender = mpsc::Sender<GenMessage>;
21-
pub type MessageReceiver = mpsc::Receiver<GenMessage>;
20+
pub type MessageSender = mpsc::Sender<SendingMessage>;
21+
pub type MessageReceiver = mpsc::Receiver<SendingMessage>;
2222

2323
pub type ResultSender = mpsc::Sender<Result<GenMessage>>;
2424
pub type ResultReceiver = mpsc::Receiver<Result<GenMessage>>;
2525

26+
#[derive(Debug)]
27+
pub struct SendingMessage {
28+
pub msg: GenMessage,
29+
pub result_chan: Option<tokio::sync::oneshot::Sender<Result<()>>>,
30+
}
31+
32+
impl SendingMessage {
33+
pub fn new(msg: GenMessage) -> Self {
34+
Self {
35+
msg,
36+
result_chan: None,
37+
}
38+
}
39+
pub fn new_with_result(
40+
msg: GenMessage,
41+
result_chan: tokio::sync::oneshot::Sender<Result<()>>,
42+
) -> Self {
43+
Self {
44+
msg,
45+
result_chan: Some(result_chan),
46+
}
47+
}
48+
49+
pub fn send_result(&mut self, result: Result<()>) {
50+
if let Some(result_ch) = self.result_chan.take() {
51+
result_ch.send(result).unwrap_or_default();
52+
}
53+
}
54+
}
55+
2656
#[derive(Debug)]
2757
pub struct ClientStream<Q, P> {
2858
tx: CSSender<Q>,
@@ -317,9 +347,13 @@ async fn _recv(rx: &mut ResultReceiver) -> Result<GenMessage> {
317347
}
318348

319349
async fn _send(tx: &MessageSender, msg: GenMessage) -> Result<()> {
320-
tx.send(msg)
350+
let (res_tx, res_rx) = tokio::sync::oneshot::channel();
351+
tx.send(SendingMessage::new_with_result(msg, res_tx))
352+
.await
353+
.map_err(|e| Error::Others(format!("Send data packet to sender error {:?}", e)))?;
354+
res_rx
321355
.await
322-
.map_err(|e| Error::Others(format!("Send data packet to sender error {e:?}")))
356+
.map_err(|e| Error::Others(format!("Failed to wait send result {:?}", e)))?
323357
}
324358

325359
#[derive(Clone, Copy, Debug, PartialEq, Eq)]

0 commit comments

Comments
 (0)