1
1
from unittest import mock
2
2
from ros_tcp_endpoint import TcpServer
3
3
from ros_tcp_endpoint .server import SysCommands
4
- from ros_tcp_endpoint .server import resolve_message_name
5
4
import importlib
6
5
import rospy
7
6
import sys
@@ -16,14 +15,14 @@ def test_server_constructor(mock_ros, mock_socket):
16
15
assert server .node_name == "test-tcp-server"
17
16
assert server .tcp_ip == "127.0.0.1"
18
17
assert server .buffer_size == 1024
19
- assert server .connections == 2
18
+ assert server .connections == 10
20
19
21
20
22
21
def test_start_server ():
23
22
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
24
23
assert server .tcp_ip == "127.0.0.1"
25
24
assert server .tcp_port == 10000
26
- assert server .connections == 2
25
+ assert server .connections == 10
27
26
server .start ()
28
27
29
28
@@ -43,13 +42,11 @@ def test_unity_service_resolve_message_name_failure():
43
42
44
43
@mock .patch .object (rospy , "Service" )
45
44
@mock .patch .object (
46
- ros_tcp_endpoint .server ,
47
- "resolve_message_name" ,
48
- return_value = "unity_interfaces.msg/RosUnitySrvMessage" ,
45
+ SysCommands , "resolve_message_name" , return_value = "unity_interfaces.msg/RosUnitySrvMessage"
49
46
)
50
47
def test_unity_service_resolve_news_service (mock_resolve_message , mock_ros_service ):
51
48
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
52
- assert server .ros_services == {}
49
+ assert server .ros_services_table == {}
53
50
system_cmds = SysCommands (server )
54
51
result = system_cmds .unity_service ("get_pos" , "unity_interfaces.msg/RosUnitySrvMessage" )
55
52
mock_ros_service .assert_called_once
@@ -58,9 +55,7 @@ def test_unity_service_resolve_news_service(mock_resolve_message, mock_ros_servi
58
55
59
56
@mock .patch .object (rospy , "Service" )
60
57
@mock .patch .object (
61
- ros_tcp_endpoint .server ,
62
- "resolve_message_name" ,
63
- return_value = "unity_interfaces.msg/RosUnitySrvMessage" ,
58
+ SysCommands , "resolve_message_name" , return_value = "unity_interfaces.msg/RosUnitySrvMessage"
64
59
)
65
60
def test_unity_service_resolve_existing_service (mock_resolve_message , mock_ros_service ):
66
61
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
@@ -74,8 +69,9 @@ def test_unity_service_resolve_existing_service(mock_resolve_message, mock_ros_s
74
69
@mock .patch .object (sys , "modules" , return_value = "unity_interfaces.msg" )
75
70
@mock .patch .object (importlib , "import_module" )
76
71
def test_resolve_message_name (mock_import_module , mock_sys_modules ):
72
+ server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
77
73
msg_name = "unity_interfaces.msg/UnityColor.msg"
78
- result = resolve_message_name (msg_name )
74
+ result = SysCommands ( server ). resolve_message_name (msg_name )
79
75
mock_import_module .assert_called_once
80
76
mock_sys_modules .assert_called_once
81
77
assert result is not None
@@ -85,98 +81,94 @@ def test_resolve_message_name(mock_import_module, mock_sys_modules):
85
81
def test_publish_add_new_topic (mock_ros_publisher ):
86
82
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
87
83
result = SysCommands (server ).publish ("object_pos_topic" , "std_msgs/Bool" )
88
- assert server .publishers != {}
84
+ assert server .publishers_table != {}
89
85
mock_ros_publisher .assert_called_once
90
86
91
87
92
88
@mock .patch .object (rospy , "Publisher" )
93
89
def test_publish_existing_topic (mock_ros_publisher ):
94
90
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
95
- server .publishers = {"object_pos_topic" : mock .Mock ()}
91
+ server .publishers_table = {"object_pos_topic" : mock .Mock ()}
96
92
result = SysCommands (server ).publish ("object_pos_topic" , "std_msgs/Bool" )
97
- assert server .publishers ["object_pos_topic" ] is not None
93
+ assert server .publishers_table ["object_pos_topic" ] is not None
98
94
mock_ros_publisher .assert_called_once
99
95
100
96
101
97
def test_publish_empty_topic_should_return_none ():
102
98
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
103
99
result = SysCommands (server ).publish ("" , "pos" )
104
100
assert result is None
105
- assert server .publishers == {}
101
+ assert server .publishers_table == {}
106
102
107
103
108
104
def test_publish_empty_message_should_return_none ():
109
105
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
110
106
result = SysCommands (server ).publish ("test-topic" , "" )
111
107
assert result is None
112
- assert server .publishers == {}
108
+ assert server .publishers_table == {}
113
109
114
110
115
111
@mock .patch .object (rospy , "Subscriber" )
116
- @mock .patch .object (
117
- ros_tcp_endpoint .server , "resolve_message_name" , return_value = "unity_interfaces.msg/Pos"
118
- )
112
+ @mock .patch .object (SysCommands , "resolve_message_name" , return_value = "unity_interfaces.msg/Pos" )
119
113
def test_subscribe_to_new_topic (mock_resolve_msg , mock_ros_subscriber ):
120
114
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
121
115
result = SysCommands (server ).subscribe ("object_pos_topic" , "pos" )
122
- assert server .subscribers != {}
116
+ assert server .subscribers_table != {}
123
117
mock_ros_subscriber .assert_called_once
124
118
125
119
126
120
@mock .patch .object (rospy , "Subscriber" )
127
- @mock .patch .object (
128
- ros_tcp_endpoint .server , "resolve_message_name" , return_value = "unity_interfaces.msg/Pos"
129
- )
121
+ @mock .patch .object (SysCommands , "resolve_message_name" , return_value = "unity_interfaces.msg/Pos" )
130
122
def test_subscribe_to_existing_topic (mock_resolve_msg , mock_ros_subscriber ):
131
123
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
132
- server .subscribers = {"object_pos_topic" : mock .Mock ()}
124
+ server .subscribers_table = {"object_pos_topic" : mock .Mock ()}
133
125
result = SysCommands (server ).subscribe ("object_pos_topic" , "pos" )
134
- assert server .subscribers ["object_pos_topic" ] is not None
126
+ assert server .subscribers_table ["object_pos_topic" ] is not None
135
127
mock_ros_subscriber .assert_called_once
136
128
137
129
138
130
def test_subscribe_to_empty_topic_should_return_none ():
139
131
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
140
132
result = SysCommands (server ).subscribe ("" , "pos" )
141
133
assert result is None
142
- assert server .subscribers == {}
134
+ assert server .subscribers_table == {}
143
135
144
136
145
137
def test_subscribe_to_empty_message_should_return_none ():
146
138
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
147
139
result = SysCommands (server ).subscribe ("test-topic" , "" )
148
140
assert result is None
149
- assert server .subscribers == {}
141
+ assert server .subscribers_table == {}
150
142
151
143
152
144
@mock .patch .object (rospy , "ServiceProxy" )
153
- @mock .patch .object (ros_tcp_endpoint . server , "resolve_message_name" )
145
+ @mock .patch .object (SysCommands , "resolve_message_name" )
154
146
def test_ros_service_new_topic (mock_resolve_msg , mock_ros_service ):
155
147
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
156
148
result = SysCommands (server ).ros_service ("object_pos_topic" , "pos" )
157
- assert server .ros_services != {}
149
+ assert server .ros_services_table != {}
158
150
mock_ros_service .assert_called_once
159
151
160
152
161
153
@mock .patch .object (rospy , "ServiceProxy" )
162
- @mock .patch .object (ros_tcp_endpoint . server , "resolve_message_name" )
154
+ @mock .patch .object (SysCommands , "resolve_message_name" )
163
155
def test_ros_service_existing_topic (mock_resolve_msg , mock_ros_service ):
164
156
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
165
- server .ros_services = {"object_pos_topic" : mock .Mock ()}
157
+ server .ros_services_table = {"object_pos_topic" : mock .Mock ()}
166
158
result = SysCommands (server ).ros_service ("object_pos_topic" , "pos" )
167
- assert server .ros_services ["object_pos_topic" ] is not None
159
+ assert server .ros_services_table ["object_pos_topic" ] is not None
168
160
mock_ros_service .assert_called_once
169
161
170
162
171
163
def test_ros_service_empty_topic_should_return_none ():
172
164
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
173
165
result = SysCommands (server ).ros_service ("" , "pos" )
174
166
assert result is None
175
- assert server .ros_services == {}
167
+ assert server .ros_services_table == {}
176
168
177
169
178
170
def test_ros_service_empty_message_should_return_none ():
179
171
server = TcpServer (node_name = "test-tcp-server" , tcp_ip = "127.0.0.1" , tcp_port = 10000 )
180
172
result = SysCommands (server ).ros_service ("test-topic" , "" )
181
173
assert result is None
182
- assert server .ros_services == {}
174
+ assert server .ros_services_table == {}
0 commit comments