Skip to content

Commit 3174ca2

Browse files
author
C. Andy Martin
committed
mitigate delays from rospy long-running callbacks (ros#1901)
Long-running callbacks in rospy can cause extreme amounts of buffering resulting in unnecessary delay, essentially ignoring the queue_size setting. This can already by somewhat mitigated by setting buff_size to be larger than the amount of data that could be buffered by a long running callback. However, setting buff_size to a correct value is not possible for the user of the API if the amount of time in the callback or the amount of data that would be transmitted is unknown. Greatly mitigate the delays in such cases by altering the structure of the receive logic. Instead of recv()ing up to buff_size data, then calling the callbacks on every message received, interleave calling recv() between each callback, enforcing queue_size as we go. Also, recv() all data currently available when calling recv() by calling recv() non-blocking after calling it blocking. While it is still possible to have stale data, even with a queue_size of 1, it is less likely, especially if the publisher of the data is on the same host. Even if not, the staleness of the data with a queue_size of 1 is now bounded by the runtime of the callback instead of by buff_size. This mitigation was chosen over a complete fix to the problem because a complete fix would involve a new thread to handle callbacks. While a new thread would allow recv() to be running all the time, even during the long callback, it is a more complex solution. Since rospy is going to be replaced in ROS2, this more tactical mitigation seems appropriate. This mitigates ros#1901
1 parent 0bf31e7 commit 3174ca2

File tree

1 file changed

+73
-16
lines changed

1 file changed

+73
-16
lines changed

clients/rospy/src/rospy/impl/tcpros_base.py

Lines changed: 73 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from io import StringIO, BytesIO #Python 3.x
4343
python3 = 1
4444
import socket
45+
import errno
4546
import logging
4647

4748
import threading
@@ -88,7 +89,7 @@ def _is_use_tcp_keepalive():
8889
_use_tcp_keepalive = val if code == 1 else True
8990
return _use_tcp_keepalive
9091

91-
def recv_buff(sock, b, buff_size):
92+
def recv_buff(sock, b, buff_size, block=True):
9293
"""
9394
Read data from socket into buffer.
9495
@param sock: socket to read from
@@ -97,15 +98,44 @@ def recv_buff(sock, b, buff_size):
9798
@type b: StringIO
9899
@param buff_size: recv read size
99100
@type buff_size: int
101+
@param block: whether to block on first recv
102+
@type block: bool
100103
@return: number of bytes read
101104
@rtype: int
102105
"""
103-
d = sock.recv(buff_size)
104-
if d:
105-
b.write(d)
106-
return len(d)
107-
else: #bomb out
108-
raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
106+
# Read all data available on the socket.
107+
# read_messages will enforce the queue_size
108+
# Block on the first read if block is set, then non-block on the rest to
109+
# read whatever is available.
110+
if block:
111+
sock.setblocking(True)
112+
else:
113+
sock.setblocking(False)
114+
bytes_received = 0
115+
try:
116+
while not is_shutdown():
117+
d = sock.recv(buff_size)
118+
if d:
119+
b.write(d)
120+
bytes_received += len(d)
121+
else:
122+
if bytes_received or not block:
123+
# Either we have received bytes and a subsequent recv
124+
# returned socket closed, or we were not blocking.
125+
break
126+
else:
127+
# No bytes received and blocking
128+
raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
129+
sock.setblocking(False)
130+
except socket.timeout:
131+
pass
132+
except (OSError, socket.error) as e:
133+
# Handle blocking socket errors
134+
if e.args[0] == errno.EAGAIN or e.args[0] == errno.EWOULDBLOCK:
135+
pass
136+
else:
137+
raise e
138+
return bytes_received
109139

110140
class TCPServer(object):
111141
"""
@@ -720,9 +750,12 @@ def write_data(self, data):
720750
raise TransportTerminated(str(errno)+' '+msg)
721751
return True
722752

723-
def receive_once(self):
753+
def receive_once(self, block=True):
724754
"""
725-
block until messages are read off of socket
755+
block until at least one message is read off socket
756+
or read all available data off of socket (if block is False)
757+
@param block whether to block and read at least one message, or to just
758+
read what is available on the socket
726759
@return: list of newly received messages
727760
@rtype: [Msg]
728761
@raise TransportException: if unable to receive message due to error
@@ -734,12 +767,20 @@ def receive_once(self):
734767
msg_queue = []
735768
p = self.protocol
736769
try:
737-
sock.setblocking(1)
738770
while not msg_queue and not self.done and not is_shutdown():
771+
bytes_received = recv_buff(sock, b, p.buff_size, block)
772+
self.stat_bytes += bytes_received
739773
if b.tell() >= 4:
740-
p.read_messages(b, msg_queue, sock)
741-
if not msg_queue:
742-
self.stat_bytes += recv_buff(sock, b, p.buff_size)
774+
p.read_messages(b, msg_queue, sock)
775+
# The caller may just be checking if there is more data between
776+
# processing messages already queued for callbacks. In that
777+
# case it is normal for there to be no data and we are not
778+
# supposed to wait for it, so break the loop with no messages.
779+
# Only do this after calling read_messages, as there may be
780+
# leftover data from after reading the header in the buffer
781+
# that needs to be deserialized.
782+
if not block and not bytes_received:
783+
break
743784
self.stat_num_msg += len(msg_queue) #STATS
744785
# set the _connection_header field
745786
for m in msg_queue:
@@ -795,14 +836,30 @@ def receive_loop(self, msgs_callback):
795836
"""
796837
# - use assert here as this would be an internal error, aka bug
797838
logger.debug("receive_loop for [%s]", self.name)
839+
msgs = []
798840
try:
841+
# On first call after reading header, there may be messages in the
842+
# leftover data that was read. Go ahead and read any leftover messages
843+
# now non-blocking. If we did not, the receive_once below will call
844+
# blocking since the loop has no messages queued, but we might only
845+
# ever get one latched message that is in the leftovers, meaning we
846+
# would block forever.
847+
msgs += self.receive_once(False)
799848
while not self.done and not is_shutdown():
800849
try:
801850
if self.socket is not None:
802-
msgs = self.receive_once()
803-
if not self.done and not is_shutdown():
804-
msgs_callback(msgs, self)
851+
# Only block if there are no msgs left to process
852+
msgs += self.receive_once(len(msgs) == 0)
853+
# Throw away any unprocessed messages before queue_size
854+
# We only process one at a time to give us an
855+
# opportunity to stay caught up on recv'ing data.
856+
# Delays in recv can cause the queue to have stale data.
857+
if self.protocol.queue_size is not None:
858+
msgs = msgs[-self.protocol.queue_size:]
859+
if not self.done and not is_shutdown() and len(msgs) > 0:
860+
msgs_callback([msgs.pop(0)], self)
805861
else:
862+
msgs = []
806863
self._reconnect()
807864

808865
except TransportException as e:

0 commit comments

Comments
 (0)