Skip to content

Commit 59c3b3e

Browse files
committed
fix timing issue of streaming
the stream request and data is handle asynchronously, if the data is handled when stream is not created yet, the data will be discarded and an error occur. Signed-off-by: Abel <fshb1988@gmail.com>
1 parent 152ac12 commit 59c3b3e

File tree

1 file changed

+17
-5
lines changed

1 file changed

+17
-5
lines changed

src/asynchronous/server.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -381,12 +381,14 @@ impl ReaderDelegate for ServerReader {
381381
async fn handle_msg(&self, msg: GenMessage) {
382382
let handler_shutdown_waiter = self.handler_shutdown.subscribe();
383383
let context = self.context();
384+
let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();
384385
spawn(async move {
385386
select! {
386-
_ = context.handle_msg(msg) => {}
387+
_ = context.handle_msg(msg, wait_tx) => {}
387388
_ = handler_shutdown_waiter.wait_shutdown() => {}
388389
}
389390
});
391+
wait_rx.await.unwrap_or_default();
390392
}
391393

392394
async fn handle_err(&self, header: MessageHeader, e: Error) {
@@ -424,7 +426,7 @@ impl HandlerContext {
424426
})
425427
.ok();
426428
}
427-
async fn handle_msg(&self, msg: GenMessage) {
429+
async fn handle_msg(&self, msg: GenMessage, wait_tx: tokio::sync::oneshot::Sender<()>) {
428430
let stream_id = msg.header.stream_id;
429431

430432
if (stream_id % 2) != 1 {
@@ -438,7 +440,7 @@ impl HandlerContext {
438440
}
439441

440442
match msg.header.type_ {
441-
MESSAGE_TYPE_REQUEST => match self.handle_request(msg).await {
443+
MESSAGE_TYPE_REQUEST => match self.handle_request(msg, wait_tx).await {
442444
Ok(opt_msg) => match opt_msg {
443445
Some(mut resp) => {
444446
// Server: check size before sending to client
@@ -471,6 +473,8 @@ impl HandlerContext {
471473
Err(status) => Self::respond_with_status(self.tx.clone(), stream_id, status).await,
472474
},
473475
MESSAGE_TYPE_DATA => {
476+
// no need to wait data message handling
477+
drop(wait_tx);
474478
// TODO(wllenyj): Compatible with golang behavior.
475479
if (msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED
476480
&& !msg.payload.is_empty()
@@ -518,7 +522,11 @@ impl HandlerContext {
518522
}
519523
}
520524

521-
async fn handle_request(&self, msg: GenMessage) -> StdResult<Option<Response>, Status> {
525+
async fn handle_request(
526+
&self,
527+
msg: GenMessage,
528+
wait_tx: tokio::sync::oneshot::Sender<()>,
529+
) -> StdResult<Option<Response>, Status> {
522530
//TODO:
523531
//if header.stream_id <= self.last_stream_id {
524532
// return Err;
@@ -539,10 +547,11 @@ impl HandlerContext {
539547
})?;
540548

541549
if let Some(method) = srv.get_method(&req.method) {
550+
drop(wait_tx);
542551
return self.handle_method(method, req_msg).await;
543552
}
544553
if let Some(stream) = srv.get_stream(&req.method) {
545-
return self.handle_stream(stream, req_msg).await;
554+
return self.handle_stream(stream, req_msg, wait_tx).await;
546555
}
547556
Err(get_status(
548557
Code::UNIMPLEMENTED,
@@ -598,6 +607,7 @@ impl HandlerContext {
598607
&self,
599608
stream: Arc<dyn StreamHandler + Send + Sync>,
600609
req_msg: Message<Request>,
610+
wait_tx: tokio::sync::oneshot::Sender<()>,
601611
) -> StdResult<Option<Response>, Status> {
602612
let stream_id = req_msg.header.stream_id;
603613
let req = req_msg.payload;
@@ -609,6 +619,8 @@ impl HandlerContext {
609619

610620
let no_data = (req_msg.header.flags & FLAG_NO_DATA) == FLAG_NO_DATA;
611621

622+
drop(wait_tx);
623+
612624
let si = StreamInner::new(
613625
stream_id,
614626
self.tx.clone(),

0 commit comments

Comments
 (0)