Skip to content

Commit 007d9a6

Browse files
author
C. Andy Martin
committed
mitigate delays from rospy long-running callbacks (ros#1901)
Long-running callbacks in rospy can cause extreme amounts of buffering resulting in unnecessary delay, essentially ignoring the queue_size setting. This can already by somewhat mitigated by setting buff_size to be larger than the amount of data that could be buffered by a long running callback. However, setting buff_size to a correct value is not possible for the user of the API if the amount of time in the callback or the amount of data that would be transmitted is unknown. Greatly mitigate the delays in such cases by altering the structure of the receive logic. Instead of recv()ing up to buff_size data, then calling the callbacks on every message received, interleave calling recv() between each callback, enforcing queue_size as we go. Also, recv() all data currently available when calling recv() by calling recv() non-blocking after calling it blocking. While it is still possible to have stale data, even with a queue_size of 1, it is less likely, especially if the publisher of the data is on the same host. Even if not, the staleness of the data with a queue_size of 1 is now bounded by the runtime of the callback instead of by buff_size. This mitigation was chosen over a complete fix to the problem because a complete fix would involve a new thread to handle callbacks. While a new thread would allow recv() to be running all the time, even during the long callback, it is a more complex solution. Since rospy is going to be replaced in ROS2, this more tactical mitigation seems appropriate. This mitigates ros#1901
1 parent 0bf31e7 commit 007d9a6

File tree

1 file changed

+59
-18
lines changed

1 file changed

+59
-18
lines changed

clients/rospy/src/rospy/impl/tcpros_base.py

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from io import StringIO, BytesIO #Python 3.x
4343
python3 = 1
4444
import socket
45+
import errno
4546
import logging
4647

4748
import threading
@@ -88,7 +89,7 @@ def _is_use_tcp_keepalive():
8889
_use_tcp_keepalive = val if code == 1 else True
8990
return _use_tcp_keepalive
9091

91-
def recv_buff(sock, b, buff_size):
92+
def recv_buff(sock, b, buff_size, block=True):
9293
"""
9394
Read data from socket into buffer.
9495
@param sock: socket to read from
@@ -97,15 +98,39 @@ def recv_buff(sock, b, buff_size):
9798
@type b: StringIO
9899
@param buff_size: recv read size
99100
@type buff_size: int
101+
@param block: whether to block on first recv
102+
@type block: bool
100103
@return: number of bytes read
101104
@rtype: int
102105
"""
103-
d = sock.recv(buff_size)
104-
if d:
105-
b.write(d)
106-
return len(d)
107-
else: #bomb out
108-
raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
106+
# Read all data available on the socket.
107+
# read_messages will enforce the queue_size
108+
# Block on the first read if block is set, then non-block on the rest to
109+
# read whatever is available.
110+
if block:
111+
sock.settimeout(None)
112+
else:
113+
sock.settimeout(0.0)
114+
bytes_received = 0
115+
try:
116+
while not is_shutdown():
117+
d = sock.recv(buff_size)
118+
if d:
119+
b.write(d)
120+
bytes_received += len(d)
121+
else: #bomb out
122+
rospy.loginfo("sock.recv returned " + repr(d))
123+
raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
124+
sock.settimeout(0.0)
125+
except socket.timeout:
126+
pass
127+
except (OSError, socket.error) as e:
128+
# Handle blocking socket errors
129+
if e.args[0] == errno.EAGAIN or e.args[0] == errno.EWOULDBLOCK:
130+
pass
131+
else:
132+
raise e
133+
return bytes_received
109134

110135
class TCPServer(object):
111136
"""
@@ -720,9 +745,12 @@ def write_data(self, data):
720745
raise TransportTerminated(str(errno)+' '+msg)
721746
return True
722747

723-
def receive_once(self):
748+
def receive_once(self, block):
724749
"""
725-
block until messages are read off of socket
750+
block until at least one message is read off socket
751+
or read all available data off of socket (if block is False)
752+
@param block whether to block and read at least one message, or to just
753+
read what is available on the socket
726754
@return: list of newly received messages
727755
@rtype: [Msg]
728756
@raise TransportException: if unable to receive message due to error
@@ -734,13 +762,17 @@ def receive_once(self):
734762
msg_queue = []
735763
p = self.protocol
736764
try:
737-
sock.setblocking(1)
738-
while not msg_queue and not self.done and not is_shutdown():
765+
# On first call after reading header, there may be messages in the
766+
# leftover data that was read. Go ahead and read any leftover messages
767+
# now to keep from blocking forever in that case.
768+
if b.tell() >= 4:
769+
extra_b = b.tell()
770+
p.read_messages(b, msg_queue, sock)
771+
while block and not msg_queue and not self.done and not is_shutdown():
772+
self.stat_bytes += recv_buff(sock, b, p.buff_size, block)
739773
if b.tell() >= 4:
740-
p.read_messages(b, msg_queue, sock)
741-
if not msg_queue:
742-
self.stat_bytes += recv_buff(sock, b, p.buff_size)
743-
self.stat_num_msg += len(msg_queue) #STATS
774+
p.read_messages(b, msg_queue, sock)
775+
self.stat_num_msg += len(msg_queue) #STATS
744776
# set the _connection_header field
745777
for m in msg_queue:
746778
m._connection_header = self.header
@@ -795,14 +827,23 @@ def receive_loop(self, msgs_callback):
795827
"""
796828
# - use assert here as this would be an internal error, aka bug
797829
logger.debug("receive_loop for [%s]", self.name)
830+
msgs = []
798831
try:
799832
while not self.done and not is_shutdown():
800833
try:
801834
if self.socket is not None:
802-
msgs = self.receive_once()
803-
if not self.done and not is_shutdown():
804-
msgs_callback(msgs, self)
835+
# Only block if there are no msgs left to process
836+
msgs += self.receive_once(len(msgs) == 0)
837+
# Throw away any unprocessed messages before queue_size
838+
# We only process one at a time to give us an
839+
# opportunity to stay caught up on recv'ing data.
840+
# Delays in recv can cause the queue to have stale data.
841+
if self.protocol.queue_size is not None:
842+
msgs = msgs[-self.protocol.queue_size:]
843+
if not self.done and not is_shutdown() and len(msgs) > 0:
844+
msgs_callback([msgs.pop(0)], self)
805845
else:
846+
msgs = []
806847
self._reconnect()
807848

808849
except TransportException as e:

0 commit comments

Comments
 (0)