Skip to content

Commit 127a8a0

Browse files
ROS2 changes
1 parent d42b2e7 commit 127a8a0

File tree

9 files changed

+175
-78
lines changed

9 files changed

+175
-78
lines changed

ros_tcp_endpoint/client.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import rclpy
1516
import struct
16-
import socket
17-
import rospy
18-
from io import BytesIO
1917

2018
import threading
2119
import json
2220

21+
from rclpy.serialization import deserialize_message
22+
from rclpy.serialization import serialize_message
23+
2324
from .exceptions import TopicOrServiceNameDoesNotExistError
2425

2526

@@ -100,9 +101,7 @@ def read_message(self, conn):
100101
data = ClientThread.recvall(conn, full_message_size)
101102

102103
if full_message_size > 0 and not data:
103-
self.tcp_server.logerr(
104-
"No data for a message size of {}, breaking!".format(full_message_size)
105-
)
104+
self.logerr("No data for a message size of {}, breaking!".format(full_message_size))
106105
return
107106

108107
destination = destination.rstrip("\x00")
@@ -124,15 +123,7 @@ def serialize_message(destination, message):
124123
length = len(dest_bytes)
125124
dest_info = struct.pack("<I%ss" % length, length, dest_bytes)
126125

127-
serial_response = BytesIO()
128-
message.serialize(serial_response)
129-
130-
# Per documention, https://docs.python.org/3.8/library/io.html#io.IOBase.seek,
131-
# seek to end of stream for length
132-
# SEEK_SET or 0 - start of the stream (the default); offset should be zero or positive
133-
# SEEK_CUR or 1 - current stream position; offset may be negative
134-
# SEEK_END or 2 - end of the stream; offset is usually negative
135-
response_len = serial_response.seek(0, 2)
126+
serial_response = serialize_message(message)
136127

137128
msg_length = struct.pack("<I", len(serial_response))
138129
serialized_message = dest_info + msg_length + serial_response

ros_tcp_endpoint/communication.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,30 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import rclpy
16+
from rclpy.node import Node
1517

16-
class RosSender:
18+
19+
class RosSender(Node):
1720
"""
1821
Base class for ROS communication where data is sent to the ROS network.
1922
"""
2023

2124
def __init__(self, node_name):
22-
self.node_name = node_name
25+
super().__init__(node_name)
2326
pass
2427

2528
def send(self, *args):
2629
raise NotImplementedError
2730

2831

29-
class RosReceiver:
32+
class RosReceiver(Node):
3033
"""
3134
Base class for ROS communication where data is being sent outside of the ROS network.
3235
"""
3336

3437
def __init__(self, node_name):
35-
self.node_name = node_name
38+
super().__init__(node_name)
3639
pass
3740

3841
def send(self, *args):

ros_tcp_endpoint/default_server_endpoint.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
#!/usr/bin/env python
22

3-
import rospy
3+
import rclpy
44

55
from ros_tcp_endpoint import TcpServer
66

77

88
def main(args=None):
9+
rclpy.init(args=args)
910
tcp_server = TcpServer("UnityEndpoint")
1011

11-
# Start the Server Endpoint
12-
rospy.init_node(ros_node_name, anonymous=True)
1312
tcp_server.start()
14-
rospy.spin()
13+
14+
tcp_server.setup_executor()
15+
16+
tcp_server.destroy_nodes()
17+
rclpy.shutdown()
1518

1619

1720
if __name__ == "__main__":

ros_tcp_endpoint/publisher.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import rospy
15+
import rclpy
16+
import re
17+
18+
from rclpy.serialization import deserialize_message
1619

1720
from .communication import RosSender
1821

@@ -35,7 +38,7 @@ def __init__(self, topic, message_class, queue_size=10, latch=False):
3538
node_name = f"{strippedTopic}_RosPublisher"
3639
RosSender.__init__(self, node_name)
3740
self.msg = message_class()
38-
self.pub = rospy.Publisher(topic, message_class, queue_size=queue_size, latch=latch)
41+
self.pub = self.create_publisher(message_class, topic, queue_size)
3942

4043
def send(self, data):
4144
"""
@@ -48,8 +51,10 @@ def send(self, data):
4851
Returns:
4952
None: Explicitly return None so behaviour can be
5053
"""
51-
self.msg.deserialize(data)
52-
self.pub.publish(self.msg)
54+
# message_type = type(self.msg)
55+
# message = deserialize_message(data, message_type)
56+
57+
self.pub.publish(data)
5358

5459
return None
5560

@@ -59,4 +64,5 @@ def unregister(self):
5964
Returns:
6065
6166
"""
62-
self.pub.unregister()
67+
self.destroy_publisher(self.pub)
68+
self.destroy_node()

ros_tcp_endpoint/server.py

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import rospy
15+
import rclpy
1616
import socket
17-
import logging
1817
import json
1918
import sys
2019
import threading
2120
import importlib
2221

22+
from rclpy.node import Node
23+
from rclpy.parameter import Parameter
24+
from rclpy.executors import MultiThreadedExecutor
25+
from rclpy.serialization import deserialize_message
26+
2327
from .tcp_sender import UnityTcpSender
2428
from .client import ClientThread
2529
from .subscriber import RosSubscriber
@@ -28,7 +32,7 @@
2832
from .unity_service import UnityService
2933

3034

31-
class TcpServer:
35+
class TcpServer(Node):
3236
"""
3337
Initializes ROS node and TCP server.
3438
"""
@@ -42,17 +46,22 @@ def __init__(self, node_name, buffer_size=1024, connections=10, tcp_ip=None, tcp
4246
buffer_size: The read buffer size used when reading from a socket
4347
connections: Max number of queued connections. See Python Socket documentation
4448
"""
49+
super().__init__(node_name)
50+
51+
self.declare_parameter("ROS_IP", "0.0.0.0")
52+
self.declare_parameter("ROS_TCP_PORT", 10000)
53+
4554
if tcp_ip:
4655
self.loginfo("Using ROS_IP override from constructor: {}".format(tcp_ip))
4756
self.tcp_ip = tcp_ip
4857
else:
49-
self.tcp_ip = rospy.get_param("/ROS_IP")
58+
self.tcp_ip = self.get_parameter("ROS_IP").get_parameter_value().string_value
5059

5160
if tcp_port:
5261
self.loginfo("Using ROS_TCP_PORT override from constructor: {}".format(tcp_port))
5362
self.tcp_port = tcp_port
5463
else:
55-
self.tcp_port = rospy.get_param("/ROS_TCP_PORT", 10000)
64+
self.tcp_port = self.get_parameter("ROS_TCP_PORT").get_parameter_value().integer_value
5665

5766
self.unity_tcp_sender = UnityTcpSender(self)
5867

@@ -118,17 +127,59 @@ def handle_syscommand(self, topic, data):
118127
function(**params)
119128

120129
def loginfo(self, text):
121-
rospy.loginfo(text)
130+
self.get_logger().info(text)
122131

123132
def logwarn(self, text):
124-
rospy.logwarn(text)
133+
self.get_logger().warning(text)
125134

126135
def logerr(self, text):
127-
rospy.logerr(text)
136+
self.get_logger().error(text)
137+
138+
def setup_executor(self):
139+
"""
140+
Since rclpy.spin() is a blocking call the server needed a way
141+
to spin all of the relevant nodes at the same time.
142+
143+
MultiThreadedExecutor allows us to set the number of threads
144+
needed as well as the nodes that need to be spun.
145+
"""
146+
num_threads = len(self.publishers_table.keys()) + len(self.subscribers_table.keys()) + len(self.ros_services_table.keys()) + len(self.unity_services_table.keys()) + 1
147+
executor = MultiThreadedExecutor(num_threads)
148+
149+
executor.add_node(self)
128150

129-
def unregister_node(old_node):
151+
for ros_node in self.publishers_table.values():
152+
executor.add_node(ros_node)
153+
for ros_node in self.subscribers_table.values():
154+
executor.add_node(ros_node)
155+
for ros_node in self.ros_services_table.values():
156+
executor.add_node(ros_node)
157+
for ros_node in self.unity_services_table.values():
158+
executor.add_node(ros_node)
159+
160+
self.executor = executor
161+
executor.spin()
162+
163+
def unregister_node(self, old_node):
130164
if old_node is not None:
131165
old_node.unregister()
166+
if self.executor is not None:
167+
self.executor.remove_node(old_node)
168+
169+
def destroy_nodes(self):
170+
"""
171+
Clean up all of the nodes
172+
"""
173+
for ros_node in self.publishers_table.values():
174+
ros_node.destroy_node()
175+
for ros_node in self.subscribers_table.values():
176+
ros_node.destroy_node()
177+
for ros_node in self.ros_services_table.values():
178+
ros_node.destroy_node()
179+
for ros_node in self.unity_services_table.values():
180+
ros_node.destroy_node()
181+
182+
self.destroy_node()
132183

133184

134185
class SysCommands:
@@ -157,6 +208,8 @@ def subscribe(self, topic, message_name):
157208

158209
new_subscriber = RosSubscriber(topic, message_class, self.tcp_server)
159210
self.tcp_server.subscribers_table[topic] = new_subscriber
211+
if self.tcp_server.executor is not None:
212+
self.tcp_server.executor.add_node(new_subscriber)
160213

161214
self.tcp_server.loginfo("RegisterSubscriber({}, {}) OK".format(topic, message_class))
162215

@@ -183,6 +236,8 @@ def publish(self, topic, message_name, queue_size=10, latch=False):
183236
new_publisher = RosPublisher(topic, message_class, queue_size=queue_size, latch=latch)
184237

185238
self.tcp_server.publishers_table[topic] = new_publisher
239+
if self.tcp_server.executor is not None:
240+
self.tcp_server.executor.add_node(new_publisher)
186241

187242
self.tcp_server.loginfo("RegisterPublisher({}, {}) OK".format(topic, message_class))
188243

@@ -210,6 +265,8 @@ def ros_service(self, topic, message_name):
210265
new_service = RosService(topic, message_class)
211266

212267
self.tcp_server.ros_services_table[topic] = new_service
268+
if self.tcp_server.executor is not None:
269+
self.tcp_server.executor.add_node(new_service)
213270

214271
self.tcp_server.loginfo("RegisterRosService({}, {}) OK".format(topic, message_class))
215272

@@ -238,6 +295,8 @@ def unity_service(self, topic, message_name):
238295
new_service = UnityService(str(topic), message_class, self.tcp_server)
239296

240297
self.tcp_server.unity_services_table[topic] = new_service
298+
if self.tcp_server.executor is not None:
299+
self.tcp_server.executor.add_node(new_service)
241300

242301
self.tcp_server.loginfo("RegisterUnityService({}, {}) OK".format(topic, message_class))
243302

ros_tcp_endpoint/service.py

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,32 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import rospy
16-
from rospy.service import ServiceException
15+
import rclpy
1716
import re
1817

18+
from rclpy.serialization import deserialize_message
19+
1920
from .communication import RosSender
2021

2122

2223
class RosService(RosSender):
2324
"""
2425
Class to send messages to a ROS service.
2526
"""
26-
2727
def __init__(self, service, service_class):
2828
"""
2929
Args:
3030
service: The service name in ROS
3131
service_class: The service class in catkin workspace
3232
"""
33-
strippedService = re.sub("[^A-Za-z0-9_]+", "", service)
34-
node_name = f"{strippedService}_RosService"
33+
strippedService = re.sub('[^A-Za-z0-9_]+', '', service)
34+
node_name = f'{strippedService}_RosService'
3535
RosSender.__init__(self, node_name)
36+
37+
self.service_topic = service
38+
self.cli = self.create_client(service_class, service)
39+
self.req = service_class.Request()
3640

37-
self.srv_class = service_class._request_class()
38-
self.srv = rospy.ServiceProxy(service, service_class)
3941

4042
def send(self, data):
4143
"""
@@ -49,27 +51,33 @@ def send(self, data):
4951
Returns:
5052
service response
5153
"""
52-
message = self.srv_class.deserialize(data)
54+
message_type = type(self.req)
55+
message = deserialize_message(data, message_type)
5356

54-
attempt = 0
57+
if not self.cli.service_is_ready():
58+
self.get_logger().error('Ignoring service call to {} - service is not ready.'.format(self.service_topic))
59+
return None
60+
61+
self.future = self.cli.call_async(message)
5562

56-
while attempt < 3:
57-
try:
58-
service_response = self.srv(message)
59-
return service_response
60-
except ServiceException:
61-
attempt += 1
62-
print("Service Exception raised. Attempt: {}".format(attempt))
63-
except Exception as e:
64-
print("Exception Raised: {}".format(e))
63+
while rclpy.ok():
64+
if self.future.done():
65+
try:
66+
response = self.future.result()
67+
return response
68+
except Exception as e:
69+
self.get_logger().info(f'Service call failed {e}')
70+
71+
break
6572

6673
return None
6774

75+
6876
def unregister(self):
6977
"""
7078
7179
Returns:
7280
7381
"""
74-
if not self.srv is None:
75-
self.srv.close()
82+
self.destroy_client(self.cli)
83+
self.destroy_node()

0 commit comments

Comments
 (0)