Skip to content

Commit ceb9dea

Browse files
author
C. Andy Martin
committed
fix delays from rospy long-running callbacks
Long-running callbacks in rospy can cause extreme amounts of buffering resulting in unnecessary delay, essentially ignoring the queue_size setting (ros#1901). This can already be somewhat mitigated by setting buff_size to be larger than the amount of data that could be buffered by a long running callback. However, setting buff_size to a correct value is not possible for the user of the API if the amount of time in the callback or the amount of data that would be transmitted is unknown. Also, even with a correct buff_size and a queue_size of 1, the received data may still be the oldest of all data transmitted while the callback was running. Fix the delays in such cases by running callbacks in a separate thread. The receive_loop then calls recv() concurrently with the long running callback, enforcing queue_size as new data is received. This fixes the latency in the data when queue_size is set to be similar to roscpp. This fixes ros#1901
1 parent 0bf31e7 commit ceb9dea

File tree

1 file changed

+45
-2
lines changed

1 file changed

+45
-2
lines changed

clients/rospy/src/rospy/impl/tcpros_base.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,26 @@ def _reconnect(self):
787787

788788
time.sleep(interval)
789789

790+
def callback_loop(self, msgs_callback):
791+
while not self.done and not is_shutdown():
792+
try:
793+
with self.msg_queue_lock:
794+
# Data that was leftover from reading header may have made
795+
# messages immediately available (such as from a latched
796+
# topic). Go ahead and process anything we already have before
797+
# waiting.
798+
while self.msg_queue:
799+
msg = self.msg_queue.pop(0)
800+
# Be sure to not hold the message queue lock while calling
801+
# the callback, it may take a while.
802+
self.msg_queue_lock.release()
803+
msgs_callback([msg], self)
804+
self.msg_queue_lock.acquire()
805+
self.msg_queue_condition.wait()
806+
except:
807+
logerr("[%s] error in callback_loop: %s" % (self.name,
808+
traceback.format_exc()))
809+
790810
def receive_loop(self, msgs_callback):
791811
"""
792812
Receive messages until shutdown
@@ -795,13 +815,36 @@ def receive_loop(self, msgs_callback):
795815
"""
796816
# - use assert here as this would be an internal error, aka bug
797817
logger.debug("receive_loop for [%s]", self.name)
818+
# Start a callback thread to process the callbacks. This way the
819+
# receive loop can continue calling recv() while a long-running
820+
# callback is running.
798821
try:
822+
self.msg_queue = []
823+
# On first call after reading header, there may be messages in the
824+
# leftover data that was read. Go ahead and read any leftover
825+
# messages in read_buff. If we did not, the receive_once below may
826+
# block forever in recv() and never place the leftover data into
827+
# the message queue.
828+
self.protocol.read_messages(
829+
self.read_buff,
830+
self.msg_queue,
831+
self.socket)
832+
self.msg_queue_lock = threading.Lock()
833+
self.msg_queue_condition = threading.Condition(self.msg_queue_lock)
834+
callback_thread = threading.Thread(
835+
target=self.callback_loop,
836+
args=(msgs_callback,))
837+
callback_thread.start()
799838
while not self.done and not is_shutdown():
800839
try:
801840
if self.socket is not None:
802841
msgs = self.receive_once()
803842
if not self.done and not is_shutdown():
804-
msgs_callback(msgs, self)
843+
with self.msg_queue_lock:
844+
self.msg_queue += msgs
845+
if self.protocol.queue_size is not None:
846+
self.msg_queue = self.msg_queue[-self.protocol.queue_size:]
847+
self.msg_queue_condition.notify()
805848
else:
806849
self._reconnect()
807850

@@ -832,7 +875,7 @@ def receive_loop(self, msgs_callback):
832875
#the reporting fails
833876
rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
834877
except: pass
835-
878+
callback_thread.join()
836879
rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
837880
finally:
838881
if not self.done:

0 commit comments

Comments
 (0)