Skip to content

Commit 6695b08

Browse files
Burning1020abel-von
andcommitted
fix timing issue of streaming
Co-authored-by: Abel Feng <fshb1988@gmail.com>
1 parent 5d1d5dc commit 6695b08

File tree

1 file changed

+18
-5
lines changed

1 file changed

+18
-5
lines changed

src/asynchronous/server.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,12 +371,14 @@ impl ReaderDelegate for ServerReader {
371371
async fn handle_msg(&self, msg: GenMessage) {
372372
let handler_shutdown_waiter = self.handler_shutdown.subscribe();
373373
let context = self.context();
374+
let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();
374375
spawn(async move {
375376
select! {
376-
_ = context.handle_msg(msg) => {}
377+
_ = context.handle_msg(msg, wait_tx) => {}
377378
_ = handler_shutdown_waiter.wait_shutdown() => {}
378379
}
379380
});
381+
wait_rx.await.unwrap_or_default();
380382
}
381383
}
382384

@@ -402,7 +404,7 @@ struct HandlerContext {
402404
}
403405

404406
impl HandlerContext {
405-
async fn handle_msg(&self, msg: GenMessage) {
407+
async fn handle_msg(&self, msg: GenMessage, wait_tx: tokio::sync::oneshot::Sender<()>) {
406408
let stream_id = msg.header.stream_id;
407409

408410
if (stream_id % 2) != 1 {
@@ -416,7 +418,7 @@ impl HandlerContext {
416418
}
417419

418420
match msg.header.type_ {
419-
MESSAGE_TYPE_REQUEST => match self.handle_request(msg).await {
421+
MESSAGE_TYPE_REQUEST => match self.handle_request(msg, wait_tx).await {
420422
Ok(opt_msg) => match opt_msg {
421423
Some(msg) => {
422424
Self::respond(self.tx.clone(), stream_id, msg)
@@ -444,6 +446,8 @@ impl HandlerContext {
444446
Err(status) => Self::respond_with_status(self.tx.clone(), stream_id, status).await,
445447
},
446448
MESSAGE_TYPE_DATA => {
449+
// no need to wait data message handling
450+
drop(wait_tx);
447451
// TODO(wllenyj): Compatible with golang behavior.
448452
if (msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED
449453
&& !msg.payload.is_empty()
@@ -492,7 +496,11 @@ impl HandlerContext {
492496
}
493497
}
494498

495-
async fn handle_request(&self, msg: GenMessage) -> StdResult<Option<Response>, Status> {
499+
async fn handle_request(
500+
&self,
501+
msg: GenMessage,
502+
wait_tx: tokio::sync::oneshot::Sender<()>,
503+
) -> StdResult<Option<Response>, Status> {
496504
//TODO:
497505
//if header.stream_id <= self.last_stream_id {
498506
// return Err;
@@ -513,10 +521,11 @@ impl HandlerContext {
513521
})?;
514522

515523
if let Some(method) = srv.get_method(&req.method) {
524+
drop(wait_tx);
516525
return self.handle_method(method, req_msg).await;
517526
}
518527
if let Some(stream) = srv.get_stream(&req.method) {
519-
return self.handle_stream(stream, req_msg).await;
528+
return self.handle_stream(stream, req_msg, wait_tx).await;
520529
}
521530
Err(get_status(
522531
Code::UNIMPLEMENTED,
@@ -572,6 +581,7 @@ impl HandlerContext {
572581
&self,
573582
stream: Arc<dyn StreamHandler + Send + Sync>,
574583
req_msg: Message<Request>,
584+
wait_tx: tokio::sync::oneshot::Sender<()>,
575585
) -> StdResult<Option<Response>, Status> {
576586
let stream_id = req_msg.header.stream_id;
577587
let req = req_msg.payload;
@@ -583,6 +593,9 @@ impl HandlerContext {
583593

584594
let _remote_close = (req_msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED;
585595
let _remote_open = (req_msg.header.flags & FLAG_REMOTE_OPEN) == FLAG_REMOTE_OPEN;
596+
597+
drop(wait_tx);
598+
586599
let si = StreamInner::new(
587600
stream_id,
588601
self.tx.clone(),

0 commit comments

Comments
 (0)