2
2
import logging
3
3
import struct
4
4
import time
5
- from typing import Callable , Optional
5
+ from collections .abc import Callable
6
+ from typing import Dict , Final , List , Optional , TYPE_CHECKING
7
+
8
+
9
+ if TYPE_CHECKING :
10
+ from canopen import Network
11
+
6
12
7
13
logger = logging .getLogger (__name__ )
8
14
9
- NMT_STATES = {
15
+ NMT_STATES : Final [ Dict [ int , str ]] = {
10
16
0 : 'INITIALISING' ,
11
17
4 : 'STOPPED' ,
12
18
5 : 'OPERATIONAL' ,
15
21
127 : 'PRE-OPERATIONAL'
16
22
}
17
23
18
- NMT_COMMANDS = {
24
+ NMT_COMMANDS : Final [ Dict [ str , int ]] = {
19
25
'OPERATIONAL' : 1 ,
20
26
'STOPPED' : 2 ,
21
27
'SLEEP' : 80 ,
26
32
'RESET COMMUNICATION' : 130
27
33
}
28
34
29
- COMMAND_TO_STATE = {
35
+ COMMAND_TO_STATE : Final [ Dict [ int , int ]] = {
30
36
1 : 5 ,
31
37
2 : 4 ,
32
38
80 : 80 ,
@@ -45,7 +51,7 @@ class NmtBase:
45
51
46
52
def __init__ (self , node_id : int ):
47
53
self .id = node_id
48
- self .network = None
54
+ self .network : Optional [ Network ] = None
49
55
self ._state = 0
50
56
51
57
def on_command (self , can_id , data , timestamp ):
@@ -111,7 +117,7 @@ def __init__(self, node_id: int):
111
117
#: Timestamp of last heartbeat message
112
118
self .timestamp : Optional [float ] = None
113
119
self .state_update = threading .Condition ()
114
- self ._callbacks = []
120
+ self ._callbacks : List [ Callable [[ int ], None ]] = []
115
121
116
122
def on_heartbeat (self , can_id , data , timestamp ):
117
123
with self .state_update :
@@ -139,6 +145,7 @@ def send_command(self, code: int):
139
145
super (NmtMaster , self ).send_command (code )
140
146
logger .info (
141
147
"Sending NMT command 0x%X to node %d" , code , self .id )
148
+ assert self .network is not None
142
149
self .network .send_message (0 , [code , self .id ])
143
150
144
151
def wait_for_heartbeat (self , timeout : float = 10 ):
@@ -180,7 +187,9 @@ def start_node_guarding(self, period: float):
180
187
:param period:
181
188
Period (in seconds) at which the node guarding should be advertised to the slave node.
182
189
"""
183
- if self ._node_guarding_producer : self .stop_node_guarding ()
190
+ if self ._node_guarding_producer :
191
+ self .stop_node_guarding ()
192
+ assert self .network is not None
184
193
self ._node_guarding_producer = self .network .send_periodic (0x700 + self .id , None , period , True )
185
194
186
195
def stop_node_guarding (self ):
@@ -216,6 +225,7 @@ def send_command(self, code: int) -> None:
216
225
217
226
if self ._state == 0 :
218
227
logger .info ("Sending boot-up message" )
228
+ assert self .network is not None
219
229
self .network .send_message (0x700 + self .id , [0 ])
220
230
221
231
# The heartbeat service should start on the transition
@@ -246,6 +256,7 @@ def start_heartbeat(self, heartbeat_time_ms: int):
246
256
self .stop_heartbeat ()
247
257
if heartbeat_time_ms > 0 :
248
258
logger .info ("Start the heartbeat timer, interval is %d ms" , self ._heartbeat_time_ms )
259
+ assert self .network is not None
249
260
self ._send_task = self .network .send_periodic (
250
261
0x700 + self .id , [self ._state ], heartbeat_time_ms / 1000.0 )
251
262
0 commit comments