12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
+ import rclpy
15
16
import struct
16
- import socket
17
- import rospy
18
- from io import BytesIO
19
17
20
18
import threading
21
19
import json
22
20
21
+ from rclpy .serialization import deserialize_message
22
+ from rclpy .serialization import serialize_message
23
+
23
24
from .exceptions import TopicOrServiceNameDoesNotExistError
24
25
25
26
@@ -28,7 +29,6 @@ class ClientThread(threading.Thread):
28
29
Thread class to read all data from a connection and pass along the data to the
29
30
desired source.
30
31
"""
31
-
32
32
def __init__ (self , conn , tcp_server , incoming_ip , incoming_port ):
33
33
"""
34
34
Set class variables
@@ -66,11 +66,10 @@ def read_int32(conn):
66
66
67
67
"""
68
68
raw_bytes = ClientThread .recvall (conn , 4 )
69
- num = struct .unpack ("<I" , raw_bytes )[0 ]
69
+ num = struct .unpack ('<I' , raw_bytes )[0 ]
70
70
return num
71
71
72
- @staticmethod
73
- def read_string (conn ):
72
+ def read_string (self ):
74
73
"""
75
74
Reads int32 from socket connection to determine how many bytes to
76
75
read to get the string that follows. Read that number of bytes and
@@ -79,39 +78,39 @@ def read_string(conn):
79
78
Returns: string
80
79
81
80
"""
82
- str_len = ClientThread .read_int32 (conn )
81
+ str_len = ClientThread .read_int32 (self . conn )
83
82
84
- str_bytes = ClientThread .recvall (conn , str_len )
85
- decoded_str = str_bytes .decode (" utf-8" )
83
+ str_bytes = ClientThread .recvall (self . conn , str_len )
84
+ decoded_str = str_bytes .decode (' utf-8' )
86
85
87
86
return decoded_str
88
87
89
- @staticmethod
90
- def read_message (conn ):
88
+ def read_message (self ):
91
89
"""
92
90
Decode destination and full message size from socket connection.
93
91
Grab bytes in chunks until full message has been read.
94
92
"""
95
- data = b""
93
+ data = b''
96
94
97
- destination = ClientThread .read_string (conn )
98
- full_message_size = ClientThread .read_int32 (conn )
95
+ destination = self .read_string ()
96
+ full_message_size = ClientThread .read_int32 (self . conn )
99
97
100
98
while len (data ) < full_message_size :
101
99
# Only grabs max of 1024 bytes TODO: change to TCPServer's buffer_size
102
100
grab = 1024 if full_message_size - len (data ) > 1024 else full_message_size - len (data )
103
- packet = ClientThread .recvall (conn , grab )
101
+ packet = ClientThread .recvall (self . conn , grab )
104
102
105
103
if not packet :
106
- rospy .logerr ("No packets..." )
104
+ self .logerr ("No packets..." )
107
105
break
108
106
109
107
data += packet
110
108
111
109
if full_message_size > 0 and not data :
112
- rospy .logerr ("No data for a message size of {}, breaking!" .format (full_message_size ))
110
+ self .logerr ("No data for a message size of {}, breaking!" .format (full_message_size ))
113
111
return
114
112
113
+ destination = destination .rstrip ('\x00 ' )
115
114
return destination , data
116
115
117
116
@staticmethod
@@ -126,22 +125,14 @@ def serialize_message(destination, message):
126
125
Returns:
127
126
serialized destination and message as a list of bytes
128
127
"""
129
- dest_bytes = destination .encode (" utf-8" )
128
+ dest_bytes = destination .encode (' utf-8' )
130
129
length = len (dest_bytes )
131
- dest_info = struct .pack (" <I%ss" % length , length , dest_bytes )
130
+ dest_info = struct .pack (' <I%ss' % length , length , dest_bytes )
132
131
133
- serial_response = BytesIO ()
134
- message .serialize (serial_response )
132
+ serial_response = serialize_message (message )
135
133
136
- # Per documention, https://docs.python.org/3.8/library/io.html#io.IOBase.seek,
137
- # seek to end of stream for length
138
- # SEEK_SET or 0 - start of the stream (the default); offset should be zero or positive
139
- # SEEK_CUR or 1 - current stream position; offset may be negative
140
- # SEEK_END or 2 - end of the stream; offset is usually negative
141
- response_len = serial_response .seek (0 , 2 )
142
-
143
- msg_length = struct .pack ("<I" , response_len )
144
- serialized_message = dest_info + msg_length + serial_response .getvalue ()
134
+ msg_length = struct .pack ('<I' , len (serial_response ))
135
+ serialized_message = dest_info + msg_length + serial_response
145
136
146
137
return serialized_message
147
138
@@ -156,14 +147,14 @@ def serialize_command(command, params):
156
147
json_info = struct .pack ("<I%ss" % json_length , json_length , json_bytes )
157
148
158
149
return cmd_info + json_info
159
-
150
+
160
151
def send_ros_service_request (self , srv_id , destination , data ):
161
152
if destination not in self .tcp_server .source_destination_dict .keys ():
162
153
error_msg = "Service destination '{}' is not registered! Known topics are: {} " .format (
163
154
destination , self .tcp_server .source_destination_dict .keys ()
164
155
)
165
156
self .tcp_server .send_unity_error (error_msg )
166
- rospy .logerr (error_msg )
157
+ self .logerr (error_msg )
167
158
# TODO: send a response to Unity anyway?
168
159
return
169
160
else :
@@ -176,12 +167,18 @@ def send_ros_service_request(self, srv_id, destination, data):
176
167
destination
177
168
)
178
169
self .tcp_server .send_unity_error (error_msg )
179
- rospy .logerr (error_msg )
170
+ self .logerr (error_msg )
180
171
# TODO: send a response to Unity anyway?
181
172
return
182
173
183
174
self .tcp_server .unity_tcp_sender .send_ros_service_response (srv_id , destination , response )
184
175
176
+ def loginfo (self , text ):
177
+ self .tcp_server .get_logger ().info (text )
178
+
179
+ def logerr (self , text ):
180
+ self .tcp_server .get_logger ().error (text )
181
+
185
182
def run (self ):
186
183
"""
187
184
Read a message and determine where to send it based on the source_destination_dict
@@ -197,13 +194,13 @@ def run(self):
197
194
msg: the ROS msg type as bytes
198
195
199
196
"""
200
- rospy .loginfo ("Connection from {}" .format (self .incoming_ip ))
197
+ self .loginfo ("Connection from {}" .format (self .incoming_ip ))
201
198
halt_event = threading .Event ()
202
199
self .tcp_server .unity_tcp_sender .start_sender (self .conn , halt_event )
203
200
try :
204
201
while not halt_event .is_set ():
205
- destination , data = self .read_message (self . conn )
206
-
202
+ destination , data = self .read_message ()
203
+
207
204
if self .tcp_server .pending_srv_id is not None :
208
205
# if we've been told that the next message will be a service request/response, process it as such
209
206
if self .tcp_server .pending_srv_is_request :
@@ -212,23 +209,22 @@ def run(self):
212
209
self .tcp_server .send_unity_service_response (self .tcp_server .pending_srv_id , data )
213
210
self .tcp_server .pending_srv_id = None
214
211
elif destination == "" :
215
- # ignore this keepalive message, listen for more
212
+ #ignore this keepalive message, listen for more
216
213
pass
217
214
elif destination .startswith ("__" ):
218
- # handle a system command, such as registering new topics
215
+ #handle a system command, such as registering new topics
219
216
self .tcp_server .handle_syscommand (destination , data )
220
217
elif destination in self .tcp_server .source_destination_dict :
221
218
ros_communicator = self .tcp_server .source_destination_dict [destination ]
222
219
response = ros_communicator .send (data )
223
220
else :
224
- error_msg = "Topic '{}' is not registered! Known topics are: {} " .format (
225
- destination , self .tcp_server .source_destination_dict .keys ()
226
- )
221
+ error_msg = "Topic '{}' is not registered! Known topics are: {} " \
222
+ .format (destination , self .tcp_server .source_destination_dict .keys ())
227
223
self .tcp_server .send_unity_error (error_msg )
228
- rospy .logerr (error_msg )
224
+ self .logerr (error_msg )
229
225
except IOError as e :
230
- rospy .logerr ("Exception: {}" .format (e ))
226
+ self .logerr ("Exception: {}" .format (e ))
231
227
finally :
232
228
halt_event .set ()
233
229
self .conn .close ()
234
- rospy .loginfo ("Disconnected from {}" .format (self .incoming_ip ))
230
+ self .loginfo ("Disconnected from {}" .format (self .incoming_ip ));
0 commit comments