Skip to content

Commit 65510aa

Browse files
author
C. Andy Martin
committed
fix delays from rospy long-running callbacks
Long-running callbacks in rospy can cause extreme amounts of buffering resulting in unnecessary delay, essentially ignoring the queue_size setting (ros#1901). This can already be somewhat mitigated by setting buff_size to be larger than the amount of data that could be buffered by a long running callback. However, setting buff_size to a correct value is not possible for the user of the API if the amount of time in the callback or the amount of data that would be transmitted is unknown. Also, even with a correct buff_size and a queue_size of 1, the received data may still be the oldest of all data transmitted while the callback was running. Fix the delays in such cases by running callbacks in a separate thread. The receive_loop then calls recv() concurrently with the long running callback, enforcing queue_size as new data is received. This fixes the latency in the data when queue_size is set to be similar to roscpp. This fixes ros#1901
1 parent c7c5bcb commit 65510aa

File tree

1 file changed

+43
-2
lines changed

1 file changed

+43
-2
lines changed

clients/rospy/src/rospy/impl/tcpros_base.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,31 @@ def _reconnect(self):
782782

783783
time.sleep(interval)
784784

785+
def callback_loop(self, msgs_callback):
786+
while not self.done and not is_shutdown():
787+
try:
788+
with self.msg_queue_lock:
789+
# Data that was leftover from reading header may have made
790+
# messages immediately available (such as from a latched
791+
# topic). Go ahead and process anything we already have before
792+
# waiting.
793+
while self.msg_queue:
794+
msg = self.msg_queue.pop(0)
795+
# Be sure to not hold the message queue lock while calling
796+
# the callback, it may take a while.
797+
self.msg_queue_lock.release()
798+
msgs_callback([msg], self)
799+
self.msg_queue_lock.acquire()
800+
self.msg_queue_condition.wait()
801+
except:
802+
# in many cases this will be a normal hangup, but log internally
803+
try:
804+
#1467 sometimes we get exceptions due to
805+
#interpreter shutdown, so blanket ignore those if
806+
#the reporting fails
807+
rospydebug("exception in callback loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
808+
except: pass
809+
785810
def receive_loop(self, msgs_callback):
786811
"""
787812
Receive messages until shutdown
@@ -790,13 +815,27 @@ def receive_loop(self, msgs_callback):
790815
"""
791816
# - use assert here as this would be an internal error, aka bug
792817
logger.debug("receive_loop for [%s]", self.name)
818+
# Start a callback thread to process the callbacks. This way the
819+
# receive loop can continue calling recv() while a long-running
820+
# callback is running.
793821
try:
822+
self.msg_queue = []
823+
self.msg_queue_lock = threading.Lock()
824+
self.msg_queue_condition = threading.Condition(self.msg_queue_lock)
825+
callback_thread = threading.Thread(
826+
target=self.callback_loop,
827+
args=(msgs_callback,))
828+
callback_thread.start()
794829
while not self.done and not is_shutdown():
795830
try:
796831
if self.socket is not None:
797832
msgs = self.receive_once()
798833
if not self.done and not is_shutdown():
799-
msgs_callback(msgs, self)
834+
with self.msg_queue_lock:
835+
self.msg_queue += msgs
836+
if self.protocol.queue_size is not None:
837+
self.msg_queue = self.msg_queue[-self.protocol.queue_size:]
838+
self.msg_queue_condition.notify()
800839
else:
801840
self._reconnect()
802841

@@ -827,7 +866,9 @@ def receive_loop(self, msgs_callback):
827866
#the reporting fails
828867
rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
829868
except: pass
830-
869+
with self.msg_queue_lock:
870+
self.msg_queue_condition.notify()
871+
callback_thread.join()
831872
rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
832873
finally:
833874
if not self.done:

0 commit comments

Comments
 (0)