Skip to content

Commit 20db6ab

Browse files
authored
Merge pull request #220 from abel-von/fix-timing
fix timing issue of streaming
2 parents d43d84f + 59c3b3e commit 20db6ab

File tree

1 file changed

+17
-5
lines changed

1 file changed

+17
-5
lines changed

src/asynchronous/server.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -381,12 +381,14 @@ impl ReaderDelegate for ServerReader {
381381
async fn handle_msg(&self, msg: GenMessage) {
382382
let handler_shutdown_waiter = self.handler_shutdown.subscribe();
383383
let context = self.context();
384+
let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();
384385
spawn(async move {
385386
select! {
386-
_ = context.handle_msg(msg) => {}
387+
_ = context.handle_msg(msg, wait_tx) => {}
387388
_ = handler_shutdown_waiter.wait_shutdown() => {}
388389
}
389390
});
391+
wait_rx.await.unwrap_or_default();
390392
}
391393

392394
async fn handle_err(&self, header: MessageHeader, e: Error) {
@@ -424,7 +426,7 @@ impl HandlerContext {
424426
})
425427
.ok();
426428
}
427-
async fn handle_msg(&self, msg: GenMessage) {
429+
async fn handle_msg(&self, msg: GenMessage, wait_tx: tokio::sync::oneshot::Sender<()>) {
428430
let stream_id = msg.header.stream_id;
429431

430432
if (stream_id % 2) != 1 {
@@ -438,7 +440,7 @@ impl HandlerContext {
438440
}
439441

440442
match msg.header.type_ {
441-
MESSAGE_TYPE_REQUEST => match self.handle_request(msg).await {
443+
MESSAGE_TYPE_REQUEST => match self.handle_request(msg, wait_tx).await {
442444
Ok(opt_msg) => match opt_msg {
443445
Some(mut resp) => {
444446
// Server: check size before sending to client
@@ -471,6 +473,8 @@ impl HandlerContext {
471473
Err(status) => Self::respond_with_status(self.tx.clone(), stream_id, status).await,
472474
},
473475
MESSAGE_TYPE_DATA => {
476+
// no need to wait data message handling
477+
drop(wait_tx);
474478
// TODO(wllenyj): Compatible with golang behavior.
475479
if (msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED
476480
&& !msg.payload.is_empty()
@@ -518,7 +522,11 @@ impl HandlerContext {
518522
}
519523
}
520524

521-
async fn handle_request(&self, msg: GenMessage) -> StdResult<Option<Response>, Status> {
525+
async fn handle_request(
526+
&self,
527+
msg: GenMessage,
528+
wait_tx: tokio::sync::oneshot::Sender<()>,
529+
) -> StdResult<Option<Response>, Status> {
522530
//TODO:
523531
//if header.stream_id <= self.last_stream_id {
524532
// return Err;
@@ -539,10 +547,11 @@ impl HandlerContext {
539547
})?;
540548

541549
if let Some(method) = srv.get_method(&req.method) {
550+
drop(wait_tx);
542551
return self.handle_method(method, req_msg).await;
543552
}
544553
if let Some(stream) = srv.get_stream(&req.method) {
545-
return self.handle_stream(stream, req_msg).await;
554+
return self.handle_stream(stream, req_msg, wait_tx).await;
546555
}
547556
Err(get_status(
548557
Code::UNIMPLEMENTED,
@@ -598,6 +607,7 @@ impl HandlerContext {
598607
&self,
599608
stream: Arc<dyn StreamHandler + Send + Sync>,
600609
req_msg: Message<Request>,
610+
wait_tx: tokio::sync::oneshot::Sender<()>,
601611
) -> StdResult<Option<Response>, Status> {
602612
let stream_id = req_msg.header.stream_id;
603613
let req = req_msg.payload;
@@ -609,6 +619,8 @@ impl HandlerContext {
609619

610620
let no_data = (req_msg.header.flags & FLAG_NO_DATA) == FLAG_NO_DATA;
611621

622+
drop(wait_tx);
623+
612624
let si = StreamInner::new(
613625
stream_id,
614626
self.tx.clone(),

0 commit comments

Comments
 (0)