Skip to content

Commit 880a9e1

Browse files
authored
Merge pull request #1748 from sktometometo/PR/add-constant-rate-throttle-node
Add ConstantRateThrottle Nodelet
2 parents 9e72b8a + 1b39e05 commit 880a9e1

File tree

8 files changed

+283
-0
lines changed

8 files changed

+283
-0
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# constant_rate_throttle_nodelet
2+
3+
![images/lightweight_throttle_nodelet_diagram.svg](images/lightweight_throttle_nodelet_diagram.svg)
4+
5+
## Description
6+
7+
This nodelet provides function like `lightweight_throttle_nodelet`, but support accurate publish rate.
8+
9+
The rate of throttled message is configurable by passing `~update rate` parameter on launching this nodelet.
10+
11+
## Subscribing Topic
12+
- `~input` (`AnyMsg`):
13+
14+
Input topic. This topic is throttled to low publish rate.
15+
16+
## Publishing Topic
17+
- `~output` (`AnyMsg`):
18+
19+
Throttled topic.
20+
Publish rate of throttled topic is configurable by setting `~update_rate` parameter.
21+
If `~update_rate` is higher than input topic rate, published message is used from buffer.
22+
23+
## Parameter
24+
- `~update_rate` (Double, default: `1.0`):
25+
26+
Publish rate of throttled message [Hz]. This parameter is updated only on launching this nodelet.

jsk_topic_tools/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
cmake_minimum_required(VERSION 2.8.3)
33
project(jsk_topic_tools)
44

5+
add_compile_options(-std=c++11)
6+
57
# Use ccache if installed to make it fast to generate object files
68
if (CMAKE_VERSION VERSION_LESS 3.4)
79
find_program(CCACHE_FOUND ccache)
@@ -52,6 +54,7 @@ add_service_files(
5254
FILES List.srv Update.srv ChangeTopic.srv PassthroughDuration.srv
5355
)
5456
generate_dynamic_reconfigure_options(
57+
cfg/ConstantRateThrottle.cfg
5558
cfg/LightweightThrottle.cfg
5659
cfg/StealthRelay.cfg
5760
cfg/SynchronizedThrottle.cfg
@@ -114,6 +117,8 @@ macro(jsk_topic_tools_nodelet _nodelet_cpp _nodelet_class _single_nodelet_exec_n
114117
endmacro(jsk_topic_tools_nodelet _nodelet_cpp _nodelet_class _single_nodelet_exec_name)
115118

116119
# nodelet shared object
120+
jsk_topic_tools_nodelet(src/constant_rate_throttle_nodelet.cpp
121+
"jsk_topic_tools/ConstantRateThrottle" "constant_rate_throttle")
117122
jsk_topic_tools_nodelet(src/lightweight_throttle_nodelet.cpp
118123
"jsk_topic_tools/LightweightThrottle" "lightweight_throttle")
119124
jsk_topic_tools_nodelet(src/mux_nodelet.cpp
@@ -173,6 +178,7 @@ if (CATKIN_ENABLE_TESTING)
173178
# jsk_topic_tools_add_rostest(test/test_topic_buffer_fixed_rate.launch)
174179
# jsk_topic_tools_add_rostest(test/test_topic_buffer_fixed_rate_and_update_rate.launch)
175180
# jsk_topic_tools_add_rostest(test/test_topic_buffer_update_rate.launch)
181+
jsk_topic_tools_add_rostest(test/test_constant_rate_throttle.test)
176182
jsk_topic_tools_add_rostest(test/test_lightweight_throttle.test)
177183
jsk_topic_tools_add_rostest(test/test_topic_compare.test)
178184
jsk_topic_tools_add_rostest(test/test_hz_measure.test)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/usr/bin/env python
2+
3+
from dynamic_reconfigure.parameter_generator_catkin import *
4+
5+
PKG = "jsk_topic_tools"
6+
7+
gen = ParameterGenerator()
8+
9+
gen.add("update_rate", double_t, 0, "Desired publish rate of throttled messages", 1.0, 0.0, 1000.0)
10+
gen.add("duration_message_valid", double_t, 0, "Timeout duration of subscribed message", 1.0, 0.0, 1000.0)
11+
12+
exit(gen.generate(PKG, PKG, "ConstantRateThrottle"))
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#ifndef CONSTANT_RATE_THROTTLE_NODELET_H_
2+
#define CONSTANT_RATE_THROTTLE_NODELET_H_
3+
4+
#include <boost/shared_ptr.hpp>
5+
#include <nodelet/nodelet.h>
6+
7+
#include <dynamic_reconfigure/server.h>
8+
#include <jsk_topic_tools/ConstantRateThrottleConfig.h>
9+
#include <topic_tools/shape_shifter.h>
10+
#include <mutex>
11+
12+
namespace jsk_topic_tools
13+
{
14+
class ConstantRateThrottle : public nodelet::Nodelet
15+
{
16+
public:
17+
typedef ConstantRateThrottleConfig Config;
18+
virtual void onInit();
19+
virtual void configCallback(Config& config, uint32_t level);
20+
virtual void publishMessage(const ros::TimerEvent&);
21+
virtual bool isLoopAlive();
22+
virtual void startPublishLoop(double loop_rate);
23+
virtual void stopPublishLoop();
24+
virtual void connectionCallback(const ros::SingleSubscriberPublisher& pub);
25+
virtual void inCallback(const boost::shared_ptr<topic_tools::ShapeShifter const>& msg);
26+
protected:
27+
typedef boost::shared_ptr<ros::Subscriber> SubscriberPtr;
28+
boost::shared_ptr<dynamic_reconfigure::Server<Config> > srv_;
29+
SubscriberPtr sub_;
30+
ros::Publisher pub_;
31+
ros::TransportHints th_;
32+
ros::NodeHandle pnh_;
33+
ros::Timer timer_publish_;
34+
bool timer_started_;
35+
36+
boost::mutex mutex_;
37+
38+
bool subscribing_;
39+
bool advertised_;
40+
41+
double update_rate_;
42+
ros::Duration duration_message_valid_;
43+
44+
ros::Time time_cached_;
45+
boost::shared_ptr<topic_tools::ShapeShifter> msg_cached_;
46+
};
47+
}
48+
49+
#endif

jsk_topic_tools/jsk_topic_tools_nodelet.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@
1818
as std_msgs/Float32.
1919
</description>
2020
</class>
21+
<class name="jsk_topic_tools/ConstantRateThrottle" type="ConstantRateThrottle"
22+
base_class_type="nodelet::Nodelet">
23+
<description>
24+
constant rate throttle nodelet.
25+
Publish ~output from ~input with the fixed rate specified by
26+
~update_rate parameter.
27+
</description>
28+
</class>
2129
<class name="jsk_topic_tools/LightweightThrottle" type="LightweightThrottle"
2230
base_class_type="nodelet::Nodelet">
2331
<description>
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<launch>
2+
3+
<node pkg="rostopic" type="rostopic" name="input"
4+
args="pub /input sensor_msgs/Image 'data: [1, 2, 3]' -r 10">
5+
</node>
6+
7+
<node
8+
pkg="nodelet"
9+
type="nodelet"
10+
name="constant_rate_throttle"
11+
args="standalone jsk_topic_tools/ConstantRateThrottle"
12+
>
13+
<rosparam>
14+
update_rate: 7.0
15+
</rosparam>
16+
<remap from="~input" to="/input" />
17+
</node>
18+
19+
<node
20+
pkg="nodelet"
21+
type="nodelet"
22+
name="lightweight_throttle"
23+
args="standalone jsk_topic_tools/LightweightThrottle"
24+
>
25+
<rosparam>
26+
update_rate: 7.0
27+
</rosparam>
28+
<remap from="~input" to="/input" />
29+
</node>
30+
31+
</launch>
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#include <jsk_topic_tools/constant_rate_throttle_nodelet.h>
2+
3+
namespace jsk_topic_tools
4+
{
5+
void ConstantRateThrottle::onInit()
6+
{
7+
pnh_ = this->getPrivateNodeHandle();
8+
subscribing_ = false;
9+
advertised_ = false;
10+
timer_started_ = false;
11+
msg_cached_ = boost::shared_ptr<topic_tools::ShapeShifter>(new topic_tools::ShapeShifter());
12+
13+
srv_ = boost::make_shared<dynamic_reconfigure::Server<Config> >(pnh_);
14+
dynamic_reconfigure::Server<Config>::CallbackType f = boost::bind(&ConstantRateThrottle::configCallback, this, _1, _2);
15+
srv_->setCallback(f);
16+
17+
sub_.reset(new ros::Subscriber(
18+
pnh_.subscribe<topic_tools::ShapeShifter>(
19+
"input", 1,
20+
&ConstantRateThrottle::inCallback,
21+
this,
22+
th_)));
23+
}
24+
25+
void ConstantRateThrottle::configCallback(Config& config, uint32_t level)
26+
{
27+
boost::mutex::scoped_lock lock(mutex_);
28+
update_rate_ = config.update_rate;
29+
duration_message_valid_ = ros::Duration(config.duration_message_valid);
30+
if ( this->isLoopAlive() ) {
31+
this->stopPublishLoop();
32+
this->startPublishLoop(update_rate_);
33+
}
34+
}
35+
36+
void ConstantRateThrottle::connectionCallback(const ros::SingleSubscriberPublisher& pub)
37+
{
38+
boost::mutex::scoped_lock lock(mutex_);
39+
if (pub_.getNumSubscribers() > 0) {
40+
if (!subscribing_) {
41+
sub_.reset(new ros::Subscriber(
42+
pnh_.subscribe<topic_tools::ShapeShifter>(
43+
"input", 1,
44+
&ConstantRateThrottle::inCallback,
45+
this,
46+
th_)));
47+
subscribing_ = true;
48+
this->startPublishLoop(update_rate_);
49+
}
50+
}
51+
else { // No subscribers, nodelet can unsubscribe input topic
52+
if (subscribing_) {
53+
sub_->shutdown();
54+
subscribing_ = false;
55+
this->stopPublishLoop();
56+
}
57+
}
58+
}
59+
60+
void ConstantRateThrottle::inCallback(const boost::shared_ptr<topic_tools::ShapeShifter const>& msg)
61+
{
62+
boost::mutex::scoped_lock lock(mutex_);
63+
if (!advertised_) {
64+
sub_->shutdown();
65+
ros::SubscriberStatusCallback connect_cb = boost::bind(&ConstantRateThrottle::connectionCallback, this, _1);
66+
ros::AdvertiseOptions opts("output", 1,
67+
msg->getMD5Sum(),
68+
msg->getDataType(),
69+
msg->getMessageDefinition(),
70+
connect_cb,
71+
connect_cb);
72+
advertised_ = true;
73+
pub_ = pnh_.advertise(opts);
74+
}
75+
76+
*msg_cached_ = *msg;
77+
time_cached_ = ros::Time::now();
78+
}
79+
80+
void ConstantRateThrottle::publishMessage(const ros::TimerEvent&)
81+
{
82+
ros::Time current_time = ros::Time::now();
83+
if ( not msg_cached_ ) {
84+
ROS_WARN("No message is Cached .");
85+
} else if ( current_time - time_cached_ < duration_message_valid_ ) {
86+
pub_.publish(msg_cached_);
87+
} else {
88+
ROS_WARN("Cached message is too old.");
89+
}
90+
}
91+
92+
bool ConstantRateThrottle::isLoopAlive()
93+
{
94+
return timer_publish_.isValid() and timer_started_;
95+
}
96+
97+
void ConstantRateThrottle::startPublishLoop(double loop_rate)
98+
{
99+
if ( not timer_publish_.isValid() ) {
100+
timer_publish_ = pnh_.createTimer(
101+
ros::Duration(1.0/update_rate_),
102+
&ConstantRateThrottle::publishMessage,
103+
this
104+
);
105+
} else {
106+
timer_publish_.setPeriod(ros::Duration(1.0/update_rate_));
107+
timer_publish_.start();
108+
timer_started_ = false;
109+
}
110+
}
111+
112+
void ConstantRateThrottle::stopPublishLoop()
113+
{
114+
timer_publish_.stop();
115+
timer_started_ = false;
116+
}
117+
118+
}
119+
120+
121+
#include <pluginlib/class_list_macros.h>
122+
typedef jsk_topic_tools::ConstantRateThrottle ConstantRateThrottle;
123+
PLUGINLIB_EXPORT_CLASS(ConstantRateThrottle, nodelet::Nodelet)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<launch>
2+
<node pkg="nodelet"
3+
type="nodelet"
4+
name="nodelet_manager"
5+
args="manager" />
6+
<node pkg="nodelet"
7+
type="nodelet"
8+
name="throttle"
9+
clear_params="true"
10+
args="load jsk_topic_tools/ConstantRateThrottle /nodelet_manager">
11+
<remap from="~input" to="/original_topic" />
12+
<remap from="~output" to="/throttle_topic" />
13+
<param name="update_rate" value="11.0" />
14+
</node>
15+
<node pkg="rostopic"
16+
type="rostopic"
17+
name="rostopic_100Hz"
18+
args="pub -r 30 original_topic std_msgs/String foo" />
19+
20+
<param name="hztest_constant_rate_throttle/topic" value="throttle_topic" />
21+
<param name="hztest_constant_rate_throttle/hz" value="11.0" />
22+
<param name="hztest_constant_rate_throttle/hzerror" value="5.0" />
23+
<param name="hztest_constant_rate_throttle/test_duration" value="30.0" />
24+
<test test-name="hztest_constant_rate_throttle"
25+
pkg="rostest"
26+
type="hztest" name="hztest_constant_rate_throttle"
27+
/>
28+
</launch>

0 commit comments

Comments
 (0)