Skip to content

Commit d32a9f6

Browse files
author
C. Andy Martin
committed
mitigate delays from rospy long-running callbacks (ros#1901)
Long-running callbacks in rospy can cause extreme amounts of buffering resulting in unnecessary delay, essentially ignoring the queue_size setting. This can already by somewhat mitigated by setting buff_size to be larger than the amount of data that could be buffered by a long running callback. However, setting buff_size to a correct value is not possible for the user of the API if the amount of time in the callback or the amount of data that would be transmitted is unknown. Greatly mitigate the delays in such cases by altering the structure of the receive logic. Instead of recv()ing up to buff_size data, then calling the callbacks on every message received, interleave calling recv() between each callback, enforcing queue_size as we go. Also, recv() all data currently available when calling recv() by calling recv() non-blocking after calling it blocking. While it is still possible to have stale data, even with a queue_size of 1, it is less likely, especially if the publisher of the data is on the same host. Even if not, the staleness of the data with a queue_size of 1 is now bounded by the runtime of the callback instead of by buff_size. This mitigation was chosen over a complete fix to the problem because a complete fix would involve a new thread to handle callbacks. While a new thread would allow recv() to be running all the time, even during the long callback, it is a more complex solution. Since rospy is going to be replaced in ROS2, this more tactical mitigation seems appropriate. This mitigates ros#1901
1 parent 0bf31e7 commit d32a9f6

File tree

1 file changed

+64
-16
lines changed

1 file changed

+64
-16
lines changed

clients/rospy/src/rospy/impl/tcpros_base.py

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from io import StringIO, BytesIO #Python 3.x
4343
python3 = 1
4444
import socket
45+
import errno
4546
import logging
4647

4748
import threading
@@ -88,7 +89,7 @@ def _is_use_tcp_keepalive():
8889
_use_tcp_keepalive = val if code == 1 else True
8990
return _use_tcp_keepalive
9091

91-
def recv_buff(sock, b, buff_size):
92+
def recv_buff(sock, b, buff_size, block=True):
9293
"""
9394
Read data from socket into buffer.
9495
@param sock: socket to read from
@@ -97,15 +98,44 @@ def recv_buff(sock, b, buff_size):
9798
@type b: StringIO
9899
@param buff_size: recv read size
99100
@type buff_size: int
101+
@param block: whether to block on first recv
102+
@type block: bool
100103
@return: number of bytes read
101104
@rtype: int
102105
"""
103-
d = sock.recv(buff_size)
104-
if d:
105-
b.write(d)
106-
return len(d)
107-
else: #bomb out
108-
raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
106+
# Read all data available on the socket.
107+
# read_messages will enforce the queue_size
108+
# Block on the first read if block is set, then non-block on the rest to
109+
# read whatever is available.
110+
if block:
111+
sock.settimeout(None)
112+
else:
113+
sock.settimeout(0.0)
114+
bytes_received = 0
115+
try:
116+
while not is_shutdown():
117+
d = sock.recv(buff_size)
118+
if d:
119+
b.write(d)
120+
bytes_received += len(d)
121+
else:
122+
if bytes_received or not block:
123+
# Either we have received bytes and a subsequent recv
124+
# returned socket closed, or we were not blocking.
125+
break
126+
else:
127+
# No bytes received and blocking
128+
raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
129+
sock.settimeout(0.0)
130+
except socket.timeout:
131+
pass
132+
except (OSError, socket.error) as e:
133+
# Handle blocking socket errors
134+
if e.args[0] == errno.EAGAIN or e.args[0] == errno.EWOULDBLOCK:
135+
pass
136+
else:
137+
raise e
138+
return bytes_received
109139

110140
class TCPServer(object):
111141
"""
@@ -720,9 +750,12 @@ def write_data(self, data):
720750
raise TransportTerminated(str(errno)+' '+msg)
721751
return True
722752

723-
def receive_once(self):
753+
def receive_once(self, block=True):
724754
"""
725-
block until messages are read off of socket
755+
block until at least one message is read off socket
756+
or read all available data off of socket (if block is False)
757+
@param block whether to block and read at least one message, or to just
758+
read what is available on the socket
726759
@return: list of newly received messages
727760
@rtype: [Msg]
728761
@raise TransportException: if unable to receive message due to error
@@ -734,12 +767,18 @@ def receive_once(self):
734767
msg_queue = []
735768
p = self.protocol
736769
try:
737-
sock.setblocking(1)
770+
# On first call after reading header, there may be messages in the
771+
# leftover data that was read. Go ahead and read any leftover messages
772+
# now to keep from blocking forever in that case.
773+
if b.tell() >= 4:
774+
p.read_messages(b, msg_queue, sock)
738775
while not msg_queue and not self.done and not is_shutdown():
776+
bytes_received = recv_buff(sock, b, p.buff_size, block)
777+
self.stat_bytes += bytes_received
778+
if not block and not bytes_received:
779+
break
739780
if b.tell() >= 4:
740-
p.read_messages(b, msg_queue, sock)
741-
if not msg_queue:
742-
self.stat_bytes += recv_buff(sock, b, p.buff_size)
781+
p.read_messages(b, msg_queue, sock)
743782
self.stat_num_msg += len(msg_queue) #STATS
744783
# set the _connection_header field
745784
for m in msg_queue:
@@ -795,14 +834,23 @@ def receive_loop(self, msgs_callback):
795834
"""
796835
# - use assert here as this would be an internal error, aka bug
797836
logger.debug("receive_loop for [%s]", self.name)
837+
msgs = []
798838
try:
799839
while not self.done and not is_shutdown():
800840
try:
801841
if self.socket is not None:
802-
msgs = self.receive_once()
803-
if not self.done and not is_shutdown():
804-
msgs_callback(msgs, self)
842+
# Only block if there are no msgs left to process
843+
msgs += self.receive_once(len(msgs) == 0)
844+
# Throw away any unprocessed messages before queue_size
845+
# We only process one at a time to give us an
846+
# opportunity to stay caught up on recv'ing data.
847+
# Delays in recv can cause the queue to have stale data.
848+
if self.protocol.queue_size is not None:
849+
msgs = msgs[-self.protocol.queue_size:]
850+
if not self.done and not is_shutdown() and len(msgs) > 0:
851+
msgs_callback([msgs.pop(0)], self)
805852
else:
853+
msgs = []
806854
self._reconnect()
807855

808856
except TransportException as e:

0 commit comments

Comments
 (0)