Skip to content

Commit db83ba8

Browse files
authored
Merge pull request #1 from Burning1020/v0.7.1-kuasar
cherry-pick containerd#220 containerd#222 containerd#220 from containerd/ttrpc-rust
2 parents 5d1d5dc + 564bb21 commit db83ba8

File tree

6 files changed

+87
-23
lines changed

6 files changed

+87
-23
lines changed

compiler/src/codegen.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,12 @@ impl<'a> ServiceGen<'a> {
487487
.any(|method| !matches!(method.method_type().0, MethodType::Unary))
488488
}
489489

490+
fn has_unary_method(&self) -> bool {
491+
self.methods
492+
.iter()
493+
.any(|method| matches!(method.method_type().0, MethodType::Unary))
494+
}
495+
490496
fn write_client(&self, w: &mut CodeWriter) {
491497
if async_on(self.customize, "client") {
492498
self.write_async_client(w)
@@ -589,9 +595,14 @@ impl<'a> ServiceGen<'a> {
589595
);
590596

591597
let has_stream_method = self.has_stream_method();
598+
let has_unary_method = self.has_unary_method();
592599
w.pub_fn(&s, |w| {
593600
w.write_line("let mut ret = HashMap::new();");
594-
w.write_line("let mut methods = HashMap::new();");
601+
if has_unary_method {
602+
w.write_line("let mut methods = HashMap::new();");
603+
} else {
604+
w.write_line("let methods = HashMap::new();");
605+
}
595606
if has_stream_method {
596607
w.write_line("let mut streams = HashMap::new();");
597608
} else {

src/asynchronous/client.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use crate::r#async::stream::{
2727
};
2828
use crate::r#async::utils;
2929

30+
use super::stream::SendingMessage;
31+
3032
/// A ttrpc Client (async).
3133
#[derive(Clone)]
3234
pub struct Client {
@@ -78,7 +80,7 @@ impl Client {
7880
self.streams.lock().unwrap().insert(stream_id, tx);
7981

8082
self.req_tx
81-
.send(msg)
83+
.send(SendingMessage::new(msg))
8284
.await
8385
.map_err(|e| Error::Others(format!("Send packet to sender error {:?}", e)))?;
8486

@@ -131,7 +133,7 @@ impl Client {
131133
// TODO: check return
132134
self.streams.lock().unwrap().insert(stream_id, tx);
133135
self.req_tx
134-
.send(msg)
136+
.send(SendingMessage::new(msg))
135137
.await
136138
.map_err(|e| Error::Others(format!("Send packet to sender error {:?}", e)))?;
137139

@@ -196,7 +198,7 @@ struct ClientWriter {
196198

197199
#[async_trait]
198200
impl WriterDelegate for ClientWriter {
199-
async fn recv(&mut self) -> Option<GenMessage> {
201+
async fn recv(&mut self) -> Option<SendingMessage> {
200202
self.rx.recv().await
201203
}
202204

src/asynchronous/connection.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use tokio::{
1616
use crate::error::Error;
1717
use crate::proto::GenMessage;
1818

19+
use super::stream::SendingMessage;
20+
1921
pub trait Builder {
2022
type Reader;
2123
type Writer;
@@ -25,7 +27,7 @@ pub trait Builder {
2527

2628
#[async_trait]
2729
pub trait WriterDelegate {
28-
async fn recv(&mut self) -> Option<GenMessage>;
30+
async fn recv(&mut self) -> Option<SendingMessage>;
2931
async fn disconnect(&self, msg: &GenMessage, e: Error);
3032
async fn exit(&self);
3133
}
@@ -57,12 +59,14 @@ where
5759
let (reader_delegate, mut writer_delegate) = builder.build();
5860

5961
let writer_task = tokio::spawn(async move {
60-
while let Some(msg) = writer_delegate.recv().await {
61-
trace!("write message: {:?}", msg);
62-
if let Err(e) = msg.write_to(&mut writer).await {
62+
while let Some(mut sending_msg) = writer_delegate.recv().await {
63+
trace!("write message: {:?}", sending_msg.msg);
64+
if let Err(e) = sending_msg.msg.write_to(&mut writer).await {
6365
error!("write_message got error: {:?}", e);
64-
writer_delegate.disconnect(&msg, e).await;
66+
sending_msg.send_result(Err(e.clone()));
67+
writer_delegate.disconnect(&sending_msg.msg, e).await;
6568
}
69+
sending_msg.send_result(Ok(()));
6670
}
6771
writer_delegate.exit().await;
6872
trace!("Writer task exit.");

src/asynchronous/server.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tokio::{
3030
#[cfg(target_os = "linux")]
3131
use tokio_vsock::VsockListener;
3232

33-
use crate::asynchronous::unix_incoming::UnixIncoming;
33+
use crate::asynchronous::{stream::SendingMessage, unix_incoming::UnixIncoming};
3434
use crate::common::{self, Domain};
3535
use crate::context;
3636
use crate::error::{get_status, Error, Result};
@@ -329,7 +329,7 @@ struct ServerWriter {
329329

330330
#[async_trait]
331331
impl WriterDelegate for ServerWriter {
332-
async fn recv(&mut self) -> Option<GenMessage> {
332+
async fn recv(&mut self) -> Option<SendingMessage> {
333333
self.rx.recv().await
334334
}
335335
async fn disconnect(&self, _msg: &GenMessage, _: Error) {}
@@ -371,12 +371,14 @@ impl ReaderDelegate for ServerReader {
371371
async fn handle_msg(&self, msg: GenMessage) {
372372
let handler_shutdown_waiter = self.handler_shutdown.subscribe();
373373
let context = self.context();
374+
let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();
374375
spawn(async move {
375376
select! {
376-
_ = context.handle_msg(msg) => {}
377+
_ = context.handle_msg(msg, wait_tx) => {}
377378
_ = handler_shutdown_waiter.wait_shutdown() => {}
378379
}
379380
});
381+
wait_rx.await.unwrap_or_default();
380382
}
381383
}
382384

@@ -402,7 +404,7 @@ struct HandlerContext {
402404
}
403405

404406
impl HandlerContext {
405-
async fn handle_msg(&self, msg: GenMessage) {
407+
async fn handle_msg(&self, msg: GenMessage, wait_tx: tokio::sync::oneshot::Sender<()>) {
406408
let stream_id = msg.header.stream_id;
407409

408410
if (stream_id % 2) != 1 {
@@ -416,7 +418,7 @@ impl HandlerContext {
416418
}
417419

418420
match msg.header.type_ {
419-
MESSAGE_TYPE_REQUEST => match self.handle_request(msg).await {
421+
MESSAGE_TYPE_REQUEST => match self.handle_request(msg, wait_tx).await {
420422
Ok(opt_msg) => match opt_msg {
421423
Some(msg) => {
422424
Self::respond(self.tx.clone(), stream_id, msg)
@@ -435,7 +437,7 @@ impl HandlerContext {
435437
};
436438

437439
self.tx
438-
.send(msg)
440+
.send(SendingMessage::new(msg))
439441
.await
440442
.map_err(err_to_others_err!(e, "Send packet to sender error "))
441443
.ok();
@@ -444,6 +446,8 @@ impl HandlerContext {
444446
Err(status) => Self::respond_with_status(self.tx.clone(), stream_id, status).await,
445447
},
446448
MESSAGE_TYPE_DATA => {
449+
// no need to wait data message handling
450+
drop(wait_tx);
447451
// TODO(wllenyj): Compatible with golang behavior.
448452
if (msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED
449453
&& !msg.payload.is_empty()
@@ -492,7 +496,11 @@ impl HandlerContext {
492496
}
493497
}
494498

495-
async fn handle_request(&self, msg: GenMessage) -> StdResult<Option<Response>, Status> {
499+
async fn handle_request(
500+
&self,
501+
msg: GenMessage,
502+
wait_tx: tokio::sync::oneshot::Sender<()>,
503+
) -> StdResult<Option<Response>, Status> {
496504
//TODO:
497505
//if header.stream_id <= self.last_stream_id {
498506
// return Err;
@@ -513,10 +521,11 @@ impl HandlerContext {
513521
})?;
514522

515523
if let Some(method) = srv.get_method(&req.method) {
524+
drop(wait_tx);
516525
return self.handle_method(method, req_msg).await;
517526
}
518527
if let Some(stream) = srv.get_stream(&req.method) {
519-
return self.handle_stream(stream, req_msg).await;
528+
return self.handle_stream(stream, req_msg, wait_tx).await;
520529
}
521530
Err(get_status(
522531
Code::UNIMPLEMENTED,
@@ -572,6 +581,7 @@ impl HandlerContext {
572581
&self,
573582
stream: Arc<dyn StreamHandler + Send + Sync>,
574583
req_msg: Message<Request>,
584+
wait_tx: tokio::sync::oneshot::Sender<()>,
575585
) -> StdResult<Option<Response>, Status> {
576586
let stream_id = req_msg.header.stream_id;
577587
let req = req_msg.payload;
@@ -583,6 +593,9 @@ impl HandlerContext {
583593

584594
let _remote_close = (req_msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED;
585595
let _remote_open = (req_msg.header.flags & FLAG_REMOTE_OPEN) == FLAG_REMOTE_OPEN;
596+
597+
drop(wait_tx);
598+
586599
let si = StreamInner::new(
587600
stream_id,
588601
self.tx.clone(),
@@ -631,7 +644,7 @@ impl HandlerContext {
631644
header: MessageHeader::new_response(stream_id, payload.len() as u32),
632645
payload,
633646
};
634-
tx.send(msg)
647+
tx.send(SendingMessage::new(msg))
635648
.await
636649
.map_err(err_to_others_err!(e, "Send packet to sender error "))
637650
}

src/asynchronous/stream.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,42 @@ use crate::proto::{
1717
MESSAGE_TYPE_DATA, MESSAGE_TYPE_RESPONSE,
1818
};
1919

20-
pub type MessageSender = mpsc::Sender<GenMessage>;
21-
pub type MessageReceiver = mpsc::Receiver<GenMessage>;
20+
pub type MessageSender = mpsc::Sender<SendingMessage>;
21+
pub type MessageReceiver = mpsc::Receiver<SendingMessage>;
2222

2323
pub type ResultSender = mpsc::Sender<Result<GenMessage>>;
2424
pub type ResultReceiver = mpsc::Receiver<Result<GenMessage>>;
2525

26+
#[derive(Debug)]
27+
pub struct SendingMessage {
28+
pub msg: GenMessage,
29+
pub result_chan: Option<tokio::sync::oneshot::Sender<Result<()>>>,
30+
}
31+
32+
impl SendingMessage {
33+
pub fn new(msg: GenMessage) -> Self {
34+
Self {
35+
msg,
36+
result_chan: None,
37+
}
38+
}
39+
pub fn new_with_result(
40+
msg: GenMessage,
41+
result_chan: tokio::sync::oneshot::Sender<Result<()>>,
42+
) -> Self {
43+
Self {
44+
msg,
45+
result_chan: Some(result_chan),
46+
}
47+
}
48+
49+
pub fn send_result(&mut self, result: Result<()>) {
50+
if let Some(result_ch) = self.result_chan.take() {
51+
result_ch.send(result).unwrap_or_default();
52+
}
53+
}
54+
}
55+
2656
#[derive(Debug)]
2757
pub struct ClientStream<Q, P> {
2858
tx: CSSender<Q>,
@@ -317,9 +347,13 @@ async fn _recv(rx: &mut ResultReceiver) -> Result<GenMessage> {
317347
}
318348

319349
async fn _send(tx: &MessageSender, msg: GenMessage) -> Result<()> {
320-
tx.send(msg)
350+
let (res_tx, res_rx) = tokio::sync::oneshot::channel();
351+
tx.send(SendingMessage::new_with_result(msg, res_tx))
352+
.await
353+
.map_err(|e| Error::Others(format!("Send data packet to sender error {:?}", e)))?;
354+
res_rx
321355
.await
322-
.map_err(|e| Error::Others(format!("Send data packet to sender error {:?}", e)))
356+
.map_err(|e| Error::Others(format!("Failed to wait send result {:?}", e)))?
323357
}
324358

325359
#[derive(Clone, Copy, Debug, PartialEq, Eq)]

ttrpc-codegen/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ readme = "README.md"
1616
protobuf-support = "3.1.0"
1717
protobuf = { version = "2.27.1" }
1818
protobuf-codegen = "3.1.0"
19-
ttrpc-compiler = "0.6.1"
19+
ttrpc-compiler = { path = "../ttrpc-compiler" }

0 commit comments

Comments
 (0)