@@ -42,10 +42,15 @@ async fn on_connect(
4242    Ok ( ( ) ) 
4343} 
4444
45- async  fn  on_send ( message :  Message ,  parts :  Arc < RwLock < WebSocketParts > > )  -> Result < Message >  { 
46-     let  parts = parts. read ( ) . await ; 
47-     let  uid = parts. extensions ( ) . get :: < usize > ( ) . unwrap ( ) ; 
45+ async  fn  on_send ( message :  Message ,  _parts :  Arc < RwLock < WebSocketParts > > )  -> Result < Message >  { 
4846    println ! ( "on_send: {:?}" ,  message) ; 
47+     Ok ( message) 
48+ } 
49+ 
50+ async  fn  on_receive ( message :  Message ,  parts :  Arc < RwLock < WebSocketParts > > )  -> Result < ( ) >  { 
51+     let  parts = parts. read ( ) . await ; 
52+     let  my_id = parts. extensions ( ) . get :: < usize > ( ) . unwrap ( ) ; 
53+     println ! ( "on_receive: {:?}" ,  message) ; 
4954    let  msg = if  let  Ok ( s)  = message. to_str ( )  { 
5055        s
5156    }  else  { 
@@ -54,14 +59,7 @@ async fn on_send(message: Message, parts: Arc<RwLock<WebSocketParts>>) -> Result
5459            msg :  "invalid message" . to_string ( ) , 
5560        } ) ; 
5661    } ; 
57-     let  message = Message :: text ( format ! ( "<User#{uid}>: {msg}" ) ) ; 
58-     Ok ( message) 
59- } 
60- 
61- async  fn  on_receive ( message :  Message ,  parts :  Arc < RwLock < WebSocketParts > > )  -> Result < ( ) >  { 
62-     let  parts = parts. read ( ) . await ; 
63-     let  my_id = parts. extensions ( ) . get :: < usize > ( ) . unwrap ( ) ; 
64-     println ! ( "on_receive: {:?}" ,  message) ; 
62+     let  message = Message :: text ( format ! ( "<User#{my_id}>: {msg}" ) ) ; 
6563    for  ( uid,  tx)  in  ONLINE_USERS . read ( ) . await . iter ( )  { 
6664        if  my_id != uid { 
6765            if  let  Err ( _disconnected)  = tx. send ( message. clone ( ) )  { } 
@@ -86,7 +84,7 @@ async fn handle_socket(ws: WebSocket) {
8684    let  ( tx,  mut  rx)  = mpsc:: unbounded_channel ( ) ; 
8785    on_connect ( parts. clone ( ) ,  tx. clone ( ) ) . await . unwrap ( ) ; 
8886    let  sender_parts = parts. clone ( ) ; 
89-     //  let receiver_parts = parts;
87+     let  receiver_parts = parts; 
9088
9189    let  fut = async  move  { 
9290        while  let  Some ( message)  = rx. recv ( ) . await  { 
@@ -101,11 +99,16 @@ async fn handle_socket(ws: WebSocket) {
10199    let  fut = async  move  { 
102100        while  let  Some ( message)  = user_ws_rx. next ( ) . await  { 
103101            if  let  Ok ( message)  = message { 
104-                 on_receive ( message,  parts. clone ( ) ) . await . unwrap ( ) ; 
102+                 if  message. is_close ( )  { 
103+                     break ; 
104+                 } 
105+                 if  on_receive ( message,  receiver_parts. clone ( ) ) . await . is_err ( )  { 
106+                     break ; 
107+                 } 
105108            } 
106109        } 
107110
108-         on_close ( parts ) . await ; 
111+         on_close ( receiver_parts ) . await ; 
109112    } ; 
110113    tokio:: task:: spawn ( fut) ; 
111114} 
0 commit comments