Skip to content

Commit 4987dc3

Browse files
author
C. Andy Martin
committed
fix delays from rospy long-running callbacks
Long-running callbacks in rospy can cause extreme amounts of buffering resulting in unnecessary delay, essentially ignoring the queue_size setting (ros#1901). This can already be somewhat mitigated by setting buff_size to be larger than the amount of data that could be buffered by a long running callback. However, setting buff_size to a correct value is not possible for the user of the API if the amount of time in the callback or the amount of data that would be transmitted is unknown. Also, even with a correct buff_size and a queue_size of 1, the received data may still be the oldest of all data transmitted while the callback was running. Fix the delays in such cases by running callbacks in a separate thread. The receive_loop then calls recv() concurrently with the long running callback, enforcing queue_size as new data is received. This fixes the latency in the data when queue_size is set to be similar to roscpp. This fixes ros#1901
1 parent d2e236d commit 4987dc3

File tree

1 file changed

+43
-2
lines changed

1 file changed

+43
-2
lines changed

clients/rospy/src/rospy/impl/tcpros_base.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,31 @@ def _reconnect(self):
787787

788788
time.sleep(interval)
789789

790+
def callback_loop(self, msgs_callback):
791+
while not self.done and not is_shutdown():
792+
try:
793+
with self.msg_queue_lock:
794+
# Data that was leftover from reading header may have made
795+
# messages immediately available (such as from a latched
796+
# topic). Go ahead and process anything we already have before
797+
# waiting.
798+
while self.msg_queue:
799+
msg = self.msg_queue.pop(0)
800+
# Be sure to not hold the message queue lock while calling
801+
# the callback, it may take a while.
802+
self.msg_queue_lock.release()
803+
msgs_callback([msg], self)
804+
self.msg_queue_lock.acquire()
805+
self.msg_queue_condition.wait()
806+
except:
807+
# in many cases this will be a normal hangup, but log internally
808+
try:
809+
#1467 sometimes we get exceptions due to
810+
#interpreter shutdown, so blanket ignore those if
811+
#the reporting fails
812+
rospydebug("exception in callback loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
813+
except: pass
814+
790815
def receive_loop(self, msgs_callback):
791816
"""
792817
Receive messages until shutdown
@@ -795,13 +820,27 @@ def receive_loop(self, msgs_callback):
795820
"""
796821
# - use assert here as this would be an internal error, aka bug
797822
logger.debug("receive_loop for [%s]", self.name)
823+
# Start a callback thread to process the callbacks. This way the
824+
# receive loop can continue calling recv() while a long-running
825+
# callback is running.
798826
try:
827+
self.msg_queue = []
828+
self.msg_queue_lock = threading.Lock()
829+
self.msg_queue_condition = threading.Condition(self.msg_queue_lock)
830+
callback_thread = threading.Thread(
831+
target=self.callback_loop,
832+
args=(msgs_callback,))
833+
callback_thread.start()
799834
while not self.done and not is_shutdown():
800835
try:
801836
if self.socket is not None:
802837
msgs = self.receive_once()
803838
if not self.done and not is_shutdown():
804-
msgs_callback(msgs, self)
839+
with self.msg_queue_lock:
840+
self.msg_queue += msgs
841+
if self.protocol.queue_size is not None:
842+
self.msg_queue = self.msg_queue[-self.protocol.queue_size:]
843+
self.msg_queue_condition.notify()
805844
else:
806845
self._reconnect()
807846

@@ -832,7 +871,9 @@ def receive_loop(self, msgs_callback):
832871
#the reporting fails
833872
rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
834873
except: pass
835-
874+
with self.msg_queue_lock:
875+
self.msg_queue_condition.notify()
876+
callback_thread.join()
836877
rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
837878
finally:
838879
if not self.done:

0 commit comments

Comments
 (0)