1
1
import threading
2
- from pubsub import pub
2
+ from event_channel . threaded_event_channel import ThreadedEventChannel
3
3
4
4
class EventManager (object ):
5
5
_instance = None
@@ -10,25 +10,33 @@ def get_instance(cls, node_name=None):
10
10
return cls ._instance
11
11
12
12
def __init__ (self , node_name ):
13
+ self ._channel = ThreadedEventChannel (blocking = False )
14
+ self ._publisher_threads = None
15
+ self ._subscribers = []
13
16
self ._node_name = node_name
14
17
self ._event_generators = []
15
18
16
19
def publish (self , topic , msg ):
17
- pub . sendMessage (topic , message = msg )
20
+ self . _publisher_threads = self . _channel . publish (topic , msg )
18
21
19
22
def register_event_listener (self , topic , callback ):
20
- pub .subscribe (callback , topic )
23
+ self ._channel .subscribe (topic , callback )
24
+ self ._subscribers .append ((topic , callback ))
21
25
22
26
def register_event_generator (self , generator_func ):
23
27
generator = threading .Thread (target = generator_func )
24
28
self ._event_generators .append (generator )
25
29
generator .start ()
26
30
27
31
def unregister_listeners (self ):
28
- pub .unsubAll ()
29
-
32
+ for l in self ._subscribers :
33
+ self ._channel .unsubscribe (l [0 ], l [1 ])
34
+ self ._subscribers = []
35
+
30
36
def unregister_publishers (self ):
31
- pass
37
+ if self ._publisher_threads :
38
+ for t in self ._publisher_threads :
39
+ t .join ()
32
40
33
41
def start_event_generators (self ):
34
42
for g in self ._event_generators :
@@ -37,3 +45,4 @@ def start_event_generators(self):
37
45
def wait_event_generators (self ):
38
46
for g in self ._event_generators :
39
47
g .join ()
48
+ self ._event_generators = []
0 commit comments