TcpStream closed, but I don't know why, help help help. #4279
-
Hello guys, I have an issue that confuses me very much, I have spent on it for several days. BACKGROUND: I receive the data from two streams from tokio TCP listener and create a bridge for the two streams with r2r(ros2) publisher and subscriber. This is a server code as follows: ros_node = Arc::new(Mutex::new(r2r::Node::create(r2r::Context::create().unwrap(), "server_node", "").unwrap())), let node_clone = self.ros_node.clone();
let handle = tokio::task::spawn(async move {
loop {
sleep(Duration::from_millis(1)).await;
node_clone.lock()
.unwrap()
.spin_once(std::time::Duration::from_millis(5));
}
});
tokio::task::spawn(async move {
self.process_stream().await
}); async fn process_stream(mut self) {
loop {
tokio::select! {
Ok((stream, _)) = self.tokio_tcp_listener.accept() => {
// wrapper the stream
let mut client_stream = FramedStream::from(stream);
if let Some(Ok(bytes)) = stream.next_timeout(MESSAGE_TIMEOUT).await {
// some parse the bytes code
let cur_stream_tag = bytes.current_stream_tag;
let peer_stream_tag = bytes.peer_stream_tag;
...........
let sub_topic = format!("/sub_{}", peer_stream_tag);
let pub_topic = format!("/pub_{}", cur_stream_tag);
// create r2r subscriber and publisher
let mut ros_sub = self .ros_node.lock()
.unwrap()
.subscribe::<ByteMultiArray>(&sub_topic).unwrap();
let ros_pub = self.ros_node
.lock()
.unwrap()
.create_publisher::<ByteMultiArray>(&pub_topic).unwrap();
tokio::task::spawn(async move {
// send some response info
client_stream.send(&msg_out).await.ok();
loop {
tokio::select! {
// listen stream
Some(stream_msg) = client_stream.next() => {
match stream_msg {
// got stream message and publish message to ros2(r2r)
Ok(client_msg) => {
ros_pub.publish(&ByteMultiArray {
data: client_msg.to_vec(),
..Default::default()
})
.ok();
log::debug!("<<<---PUB: publish OK");
}
Err(e) => {
log::error!(
"<<<---ERROR: Fetch stream exception, error: {:?}",
e
);
log::warn!("EXIT loop");
break;
}
}
}
Some(ros_msg) = ros_sub.next() => {
log::debug!("--->>>SUB: subscribe from topic: {}", sub_topic);
client_stream.send_bytes(Bytes::from(ros_msg.data)).await.ok();
log::debug!("--->>>STREAM: send stream message OK");
}
}
}
}
}
}
} I will get Anybody can help me analyze why and how to get the tokio stream close reason? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Can you include the imports and the client code too, and try and minify this? It's hard for me to tell what is from ROS vs tokio vs other crates. |
Beta Was this translation helpful? Give feedback.
Can you include the imports and the client code too, and try and minify this? It's hard for me to tell what is from ROS vs tokio vs other crates.