Skip to content

Commit 79c601f

Browse files
Merge from ros1
2 parents 86703a9 + b66dd79 commit 79c601f

File tree

2 files changed

+55
-42
lines changed

2 files changed

+55
-42
lines changed

ros_tcp_endpoint/client.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class ClientThread(threading.Thread):
2929
Thread class to read all data from a connection and pass along the data to the
3030
desired source.
3131
"""
32+
3233
def __init__(self, conn, tcp_server, incoming_ip, incoming_port):
3334
"""
3435
Set class variables
@@ -66,7 +67,7 @@ def read_int32(conn):
6667
6768
"""
6869
raw_bytes = ClientThread.recvall(conn, 4)
69-
num = struct.unpack('<I', raw_bytes)[0]
70+
num = struct.unpack("<I", raw_bytes)[0]
7071
return num
7172

7273
def read_string(self):
@@ -81,24 +82,24 @@ def read_string(self):
8182
str_len = ClientThread.read_int32(self.conn)
8283

8384
str_bytes = ClientThread.recvall(self.conn, str_len)
84-
decoded_str = str_bytes.decode('utf-8')
85+
decoded_str = str_bytes.decode("utf-8")
8586

8687
return decoded_str
8788

88-
def read_message(self):
89+
def read_message(self, conn):
8990
"""
9091
Decode destination and full message size from socket connection.
9192
Grab bytes in chunks until full message has been read.
9293
"""
93-
data = b''
94+
data = b""
9495

9596
destination = self.read_string()
96-
full_message_size = ClientThread.read_int32(self.conn)
97+
full_message_size = ClientThread.read_int32(conn)
9798

9899
while len(data) < full_message_size:
99100
# Only grabs max of 1024 bytes TODO: change to TCPServer's buffer_size
100101
grab = 1024 if full_message_size - len(data) > 1024 else full_message_size - len(data)
101-
packet = ClientThread.recvall(self.conn, grab)
102+
packet = ClientThread.recvall(conn, grab)
102103

103104
if not packet:
104105
self.logerr("No packets...")
@@ -110,7 +111,7 @@ def read_message(self):
110111
self.logerr("No data for a message size of {}, breaking!".format(full_message_size))
111112
return
112113

113-
destination = destination.rstrip('\x00')
114+
destination = destination.rstrip("\x00")
114115
return destination, data
115116

116117
@staticmethod
@@ -125,13 +126,13 @@ def serialize_message(destination, message):
125126
Returns:
126127
serialized destination and message as a list of bytes
127128
"""
128-
dest_bytes = destination.encode('utf-8')
129+
dest_bytes = destination.encode("utf-8")
129130
length = len(dest_bytes)
130-
dest_info = struct.pack('<I%ss' % length, length, dest_bytes)
131+
dest_info = struct.pack("<I%ss" % length, length, dest_bytes)
131132

132133
serial_response = serialize_message(message)
133134

134-
msg_length = struct.pack('<I', len(serial_response))
135+
msg_length = struct.pack("<I", len(serial_response))
135136
serialized_message = dest_info + msg_length + serial_response
136137

137138
return serialized_message
@@ -147,7 +148,7 @@ def serialize_command(command, params):
147148
json_info = struct.pack("<I%ss" % json_length, json_length, json_bytes)
148149

149150
return cmd_info + json_info
150-
151+
151152
def send_ros_service_request(self, srv_id, destination, data):
152153
if destination not in self.tcp_server.source_destination_dict.keys():
153154
error_msg = "Service destination '{}' is not registered! Known topics are: {} ".format(
@@ -159,17 +160,17 @@ def send_ros_service_request(self, srv_id, destination, data):
159160
return
160161
else:
161162
ros_communicator = self.tcp_server.source_destination_dict[destination]
162-
service_thread = threading.Thread(target=self.service_call_thread, args=(srv_id, destination, data, ros_communicator))
163+
service_thread = threading.Thread(
164+
target=self.service_call_thread, args=(srv_id, destination, data, ros_communicator)
165+
)
163166
service_thread.daemon = True
164167
service_thread.start()
165168

166169
def service_call_thread(self, srv_id, destination, data, ros_communicator):
167170
response = ros_communicator.send(data)
168-
171+
169172
if not response:
170-
error_msg = "No response data from service '{}'!".format(
171-
destination
172-
)
173+
error_msg = "No response data from service '{}'!".format(destination)
173174
self.tcp_server.send_unity_error(error_msg)
174175
self.logerr(error_msg)
175176
# TODO: send a response to Unity anyway?
@@ -182,7 +183,7 @@ def loginfo(self, text):
182183

183184
def logerr(self, text):
184185
self.tcp_server.get_logger().error(text)
185-
186+
186187
def run(self):
187188
"""
188189
Read a message and determine where to send it based on the source_destination_dict
@@ -203,32 +204,37 @@ def run(self):
203204
self.tcp_server.unity_tcp_sender.start_sender(self.conn, halt_event)
204205
try:
205206
while not halt_event.is_set():
206-
destination, data = self.read_message()
207+
destination, data = self.read_message(self.conn)
207208

208209
if self.tcp_server.pending_srv_id is not None:
209210
# if we've been told that the next message will be a service request/response, process it as such
210211
if self.tcp_server.pending_srv_is_request:
211-
self.send_ros_service_request(self.tcp_server.pending_srv_id, destination, data)
212+
self.send_ros_service_request(
213+
self.tcp_server.pending_srv_id, destination, data
214+
)
212215
else:
213-
self.tcp_server.send_unity_service_response(self.tcp_server.pending_srv_id, data)
216+
self.tcp_server.send_unity_service_response(
217+
self.tcp_server.pending_srv_id, data
218+
)
214219
self.tcp_server.pending_srv_id = None
215220
elif destination == "":
216-
#ignore this keepalive message, listen for more
221+
# ignore this keepalive message, listen for more
217222
pass
218223
elif destination.startswith("__"):
219-
#handle a system command, such as registering new topics
224+
# handle a system command, such as registering new topics
220225
self.tcp_server.handle_syscommand(destination, data)
221226
elif destination in self.tcp_server.source_destination_dict:
222227
ros_communicator = self.tcp_server.source_destination_dict[destination]
223228
response = ros_communicator.send(data)
224229
else:
225-
error_msg = "Topic '{}' is not registered! Known topics are: {} "\
226-
.format(destination, self.tcp_server.source_destination_dict.keys())
230+
error_msg = "Topic '{}' is not registered! Known topics are: {} ".format(
231+
destination, self.tcp_server.source_destination_dict.keys()
232+
)
227233
self.tcp_server.send_unity_error(error_msg)
228234
self.logerr(error_msg)
229235
except IOError as e:
230236
self.logerr("Exception: {}".format(e))
231237
finally:
232238
halt_event.set()
233239
self.conn.close()
234-
self.loginfo("Disconnected from {}".format(self.incoming_ip));
240+
self.loginfo("Disconnected from {}".format(self.incoming_ip))

ros_tcp_endpoint/tcp_sender.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@
3232
from Queue import Queue
3333
from Queue import Empty
3434

35+
3536
class UnityTcpSender:
3637
"""
3738
Sends messages to Unity.
3839
"""
40+
3941
def __init__(self, tcp_server):
40-
#super().__init__(f'UnityTcpSender')
42+
# super().__init__(f'UnityTcpSender')
4143

4244
self.sender_id = 1
4345
self.time_between_halt_checks = 5
@@ -46,7 +48,7 @@ def __init__(self, tcp_server):
4648
# Each sender thread has its own queue: this is always the queue for the currently active thread.
4749
self.queue = None
4850
self.queue_lock = threading.Lock()
49-
51+
5052
# variables needed for matching up unity service requests with responses
5153
self.next_srv_id = 1001
5254
self.srv_lock = threading.Lock()
@@ -91,12 +93,12 @@ def send_unity_service_request(self, topic, service_class, request):
9193
return None
9294

9395
thread_pauser = ThreadPauser()
94-
96+
9597
with self.srv_lock:
9698
srv_id = self.next_srv_id
97-
self.next_srv_id+=1
99+
self.next_srv_id += 1
98100
self.services_waiting[srv_id] = thread_pauser
99-
101+
100102
command = SysCommand_Service()
101103
command.srv_id = srv_id
102104
serialized_bytes = ClientThread.serialize_command("__request", command)
@@ -106,16 +108,16 @@ def send_unity_service_request(self, topic, service_class, request):
106108
# rospy starts a new thread for each service request,
107109
# so it won't break anything if we sleep now while waiting for the response
108110
thread_pauser.sleep_until_resumed()
109-
111+
110112
response = deserialize_message(thread_pauser.result, service_class.Response())
111113
return response
112-
114+
113115
def send_unity_service_response(self, srv_id, data):
114116
thread_pauser = None
115117
with self.srv_lock:
116118
thread_pauser = self.services_waiting[srv_id]
117119
del self.services_waiting[srv_id]
118-
120+
119121
thread_pauser.resume_with_result(data)
120122

121123
def send_topic_list(self):
@@ -126,32 +128,34 @@ def send_topic_list(self):
126128
topic_list.types = [item[1] for item in topics_and_types]
127129
serialized_bytes = ClientThread.serialize_command("__topic_list", topic_list)
128130
self.queue.put(serialized_bytes)
129-
131+
130132
def start_sender(self, conn, halt_event):
131-
sender_thread = threading.Thread(target=self.sender_loop, args=(conn, self.sender_id, halt_event))
133+
sender_thread = threading.Thread(
134+
target=self.sender_loop, args=(conn, self.sender_id, halt_event)
135+
)
132136
self.sender_id += 1
133137

134138
# Exit the server thread when the main thread terminates
135139
sender_thread.daemon = True
136140
sender_thread.start()
137-
141+
138142
def sender_loop(self, conn, tid, halt_event):
139143
s = None
140144
local_queue = Queue()
141-
local_queue.put(b'\0\0\0\0\0\0\0\0') # send an empty message to confirm connection
145+
local_queue.put(b"\0\0\0\0\0\0\0\0") # send an empty message to confirm connection
142146
with self.queue_lock:
143147
self.queue = local_queue
144-
148+
145149
try:
146150
while not halt_event.is_set():
147151
try:
148152
item = local_queue.get(timeout=self.time_between_halt_checks)
149153
except Empty:
150-
# I'd like to just wait on the queue, but we also need to check occasionally for the connection being closed
154+
# I'd like to just wait on the queue, but we also need to check occasionally for the connection being closed
151155
# (otherwise the thread never terminates.)
152156
continue
153-
154-
#print("Sender {} sending an item".format(tid))
157+
158+
# print("Sender {} sending an item".format(tid))
155159

156160
try:
157161
conn.sendall(item)
@@ -164,12 +168,15 @@ def sender_loop(self, conn, tid, halt_event):
164168
if self.queue is local_queue:
165169
self.queue = None
166170

171+
167172
class SysCommand_Log:
168173
text = ""
169174

175+
170176
class SysCommand_Service:
171177
srv_id = 0
172178

179+
173180
class SysCommand_TopicsResponse:
174181
topics = []
175-
types = []
182+
types = []

0 commit comments

Comments
 (0)