Skip to content

Commit 8f2ff2f

Browse files
author
C. Andy Martin
committed
fix delays from rospy long-running callbacks
Long-running callbacks in rospy can cause extreme amounts of buffering resulting in unnecessary delay, essentially ignoring the queue_size setting (ros#1901). This can already be somewhat mitigated by setting buff_size to be larger than the amount of data that could be buffered by a long running callback. However, setting buff_size to a correct value is not possible for the user of the API if the amount of time in the callback or the amount of data that would be transmitted is unknown. Also, even with a correct buff_size and a queue_size of 1, the received data may still be the oldest of all data transmitted while the callback was running. Fix the delays in such cases by running callbacks in a separate thread. The receive_loop then calls recv() concurrently with the long running callback, enforcing queue_size as new data is received. This fixes the latency in the data when queue_size is set to be similar to roscpp. This fixes ros#1901
1 parent 0bf31e7 commit 8f2ff2f

File tree

1 file changed

+44
-2
lines changed

1 file changed

+44
-2
lines changed

clients/rospy/src/rospy/impl/tcpros_base.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,25 @@ def _reconnect(self):
787787

788788
time.sleep(interval)
789789

790+
def callback_loop(self, msgs_callback):
791+
while not self.done and not is_shutdown():
792+
try:
793+
with self.msg_queue_lock:
794+
# Data that was leftover from reading header may have made
795+
# messages immediately available (such as from a latched
796+
# topic). Go ahead and process anything we already have before
797+
# waiting.
798+
while self.msg_queue:
799+
msg = self.msg_queue.pop(0)
800+
# Be sure to not hold the message queue lock while calling
801+
# the callback, it may take a while.
802+
self.msg_queue_lock.release()
803+
msgs_callback([msg], self)
804+
self.msg_queue_lock.acquire()
805+
self.msg_queue_condition.wait()
806+
except:
807+
logerr("[%s] error in callback_loop: %s"%self.name, traceback.format_exc())
808+
790809
def receive_loop(self, msgs_callback):
791810
"""
792811
Receive messages until shutdown
@@ -795,13 +814,36 @@ def receive_loop(self, msgs_callback):
795814
"""
796815
# - use assert here as this would be an internal error, aka bug
797816
logger.debug("receive_loop for [%s]", self.name)
817+
# Start a callback thread to process the callbacks. This way the
818+
# receive loop can continue calling recv() while a long-running
819+
# callback is running.
798820
try:
821+
self.msg_queue = []
822+
# On first call after reading header, there may be messages in the
823+
# leftover data that was read. Go ahead and read any leftover
824+
# messages in read_buff. If we did not, the receive_once below may
825+
# block forever in recv() and never place the leftover data into
826+
# the message queue.
827+
self.protocol.read_messages(
828+
self.read_buff,
829+
self.msg_queue,
830+
self.socket)
831+
self.msg_queue_lock = threading.Lock()
832+
self.msg_queue_condition = threading.Condition(self.msg_queue_lock)
833+
callback_thread = threading.Thread(
834+
target=self.callback_loop,
835+
args=[msgs_callback])
836+
callback_thread.start()
799837
while not self.done and not is_shutdown():
800838
try:
801839
if self.socket is not None:
802840
msgs = self.receive_once()
803841
if not self.done and not is_shutdown():
804-
msgs_callback(msgs, self)
842+
with self.msg_queue_lock:
843+
self.msg_queue += msgs
844+
if self.protocol.queue_size is not None:
845+
self.msg_queue = self.msg_queue[-self.protocol.queue_size:]
846+
self.msg_queue_condition.notify()
805847
else:
806848
self._reconnect()
807849

@@ -832,7 +874,7 @@ def receive_loop(self, msgs_callback):
832874
#the reporting fails
833875
rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
834876
except: pass
835-
877+
callback_thread.join()
836878
rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
837879
finally:
838880
if not self.done:

0 commit comments

Comments
 (0)