9
9
class Othread (Configurable , threading .Thread ):
10
10
11
11
def __init__ (self , engine_config , runtime_context ,
12
- start_trigger , component_factory , ** kwargs ):
12
+ component_factory , start_trigger , stop_trigger , ** kwargs ):
13
13
Configurable .__init__ (self , engine_config )
14
14
threading .Thread .__init__ (self , ** kwargs )
15
15
self ._driver_cls = load_class (engine_config .driver_cls , Driver )
16
16
self ._component_factory = component_factory
17
17
self ._runtime_context = runtime_context
18
- self ._stopping = False
18
+ self ._stop_trigger = stop_trigger
19
19
self ._start_trigger = start_trigger
20
20
self ._logger = logging .getLogger (self .name )
21
21
@@ -39,23 +39,24 @@ def run(self):
39
39
driver .cleanup ()
40
40
except Exception as e :
41
41
self ._logger .error ('Unexpected exception, %s' % str (e ))
42
+ self ._stop_trigger .set ()
42
43
self ._logger .debug ('Stop' )
43
44
44
45
def stopping (self ):
45
- return self ._stopping
46
-
47
- def stop (self ):
48
- self ._stopping = True
46
+ return self ._stop_trigger .is_set ()
49
47
50
48
51
49
class OthreadManager (Configurable ):
52
50
def __init__ (self , engine_config , runtime_context , component_factory ):
53
51
super (OthreadManager , self ).__init__ (engine_config )
54
52
start_trigger = threading .Event ()
53
+ stop_trigger = threading .Event ()
55
54
self ._start_trigger = start_trigger
55
+ self ._stop_trigger = stop_trigger
56
56
thread_cls = load_class (engine_config .thread_cls , Othread )
57
57
self ._threads = [thread_cls (
58
- engine_config , runtime_context , start_trigger , component_factory ,
58
+ engine_config , runtime_context , component_factory ,
59
+ start_trigger , stop_trigger ,
59
60
name = '%s-%d' % (thread_cls .__name__ , i ))
60
61
for i in range (0 , engine_config .thread_num )]
61
62
@@ -76,7 +77,7 @@ def join(self):
76
77
list (map (lambda t : t .join (), self ._threads ))
77
78
78
79
def stop (self ):
79
- list ( map ( lambda t : t . stop (), self ._threads ) )
80
+ self ._stop_trigger . set ( )
80
81
81
82
def stopped (self ):
82
83
return not any ([t .isAlive () for t in self ._threads ])
0 commit comments