Skip to content

Commit a2dd753

Browse files
committed
add: refactoring finish
1 parent c547f5c commit a2dd753

20 files changed

+498
-370
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
[![PyPI](https://img.shields.io/pypi/v/os-3m-engine.svg)](https://pypi.python.org/pypi/os-3m-engine)
77

88

9-
Multithread engine for 3(or 2) stage job.
9+
Multithread engine for 3(or 2) stages job.
1010

1111

1212
# Install

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
os-config>0.1

src/os_3m_engine/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import sys
2-
from .engine import create
32

4-
__all__ = ['__version__', 'version_info', 'create']
3+
__all__ = ['__version__', 'version_info']
54

65
import pkgutil
76
__version__ = pkgutil.get_data(__package__, 'VERSION').decode('ascii').strip()

src/os_3m_engine/backend.py

Lines changed: 0 additions & 49 deletions
This file was deleted.

src/os_3m_engine/common.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
21
import sys
32

3+
from .utils import load_class
4+
45
_PY3 = sys.version_info[0] == 3
56

67
if _PY3:
78
import queue as Queue
9+
binary_stdin = sys.stdin.buffer
810
else:
911
import Queue
12+
binary_stdin = sys.stdin
1013

1114

1215
class Configurable(object):
@@ -18,5 +21,43 @@ def config(self):
1821
return self._config
1922

2023

21-
class RuntimeContext(object):
24+
class Workflowable(object):
25+
def setup(self):
26+
pass
27+
28+
def cleanup(self):
29+
pass
30+
31+
32+
class Component(Configurable, Workflowable):
2233
pass
34+
35+
36+
class StandardComponent(Component):
37+
def __init__(self, component_config, engine_config, runtime_context, current_thread):
38+
super(StandardComponent, self).__init__(component_config)
39+
self._engine_config = engine_config
40+
self._runtime_context = runtime_context
41+
self._current_thread = current_thread
42+
43+
44+
class FactoryInterface(object):
45+
def create(self, *args, **kwargs):
46+
raise NotImplementedError
47+
48+
49+
class ConfigurableFactory(Configurable):
50+
def create(self, *args, **kwargs):
51+
raise NotImplementedError
52+
53+
54+
class ComponentFactory(ConfigurableFactory):
55+
56+
def __init__(self, config, component_cls_string):
57+
super(ComponentFactory, self).__init__(config)
58+
self._component_cls = load_class(
59+
component_cls_string, StandardComponent)
60+
61+
def create(self, engine_config, runtime_context, current_thread):
62+
return self._component_cls(self.config, engine_config,
63+
runtime_context, current_thread)

src/os_3m_engine/core/__init__.py

Whitespace-only changes.

src/os_3m_engine/core/backend.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from ..common import Queue, StandardComponent
2+
from .driver import Driver
3+
from .othread import Othread
4+
5+
6+
class Backend(StandardComponent):
7+
def process(self, data):
8+
raise NotImplementedError
9+
10+
11+
class BackendDriver(Driver):
12+
13+
def setup(self):
14+
if hasattr(self._runtime_context, 'transport_thread'):
15+
self._queue = self._runtime_context.backend_thread_queue
16+
else:
17+
self._queue = self._runtime_context.frontend_thread_queue
18+
super(BackendDriver, self).setup()
19+
20+
@property
21+
def queue(self):
22+
return self._queue
23+
24+
def run(self):
25+
while True:
26+
if self._current_thread.stopping():
27+
if self.queue.qsize() <= 0:
28+
if not hasattr(self._runtime_context, 'transport_thread'):
29+
break
30+
elif self._runtime_context.transport_thread.stopped():
31+
break
32+
33+
try:
34+
data = self.queue.get(block=True, timeout=0.1)
35+
self._component.process(data)
36+
except Queue.Empty:
37+
pass
38+
39+
40+
class BackendThread(Othread):
41+
42+
def run(self):
43+
super(BackendThread, self).run()
44+
self._runtime_context.frontend_thread.stop()
45+
if hasattr(self._runtime_context, 'transport_thread'):
46+
self._runtime_context.transport_thread.stop()

src/os_3m_engine/core/driver.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from ..common import Configurable, Workflowable
2+
3+
4+
class Driver(Configurable, Workflowable):
5+
6+
def __init__(self, engine_config, runtime_context, current_thread, component_factory):
7+
super(Driver, self).__init__(engine_config)
8+
self._runtime_context = runtime_context
9+
self._current_thread = current_thread
10+
self._component = component_factory.create(
11+
engine_config, runtime_context, current_thread)
12+
13+
def setup(self):
14+
self._component.setup()
15+
16+
def cleanup(self):
17+
self._component.cleanup()
18+
19+
def run(self):
20+
raise NotImplementedError

src/os_3m_engine/core/engine.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import threading
2+
import time
3+
4+
from ..common import Configurable
5+
from ..utils import getatters
6+
7+
8+
class RuntimeContext(object):
9+
pass
10+
11+
12+
class Engine(Configurable):
13+
14+
def __init__(self, config, runtime_context):
15+
super(Engine, self).__init__(config)
16+
self.__start_lock = threading.Lock()
17+
self.__started = False
18+
self.__stopped = False
19+
self._runtime_context = runtime_context
20+
21+
def start(self):
22+
23+
self.__acquire_start_lock(False, 'Can not start twice')
24+
runtime_context = self._runtime_context
25+
m = getatters(runtime_context, [
26+
'backend_thread',
27+
'transport_thread',
28+
'frontend_thread'
29+
])
30+
list(map(lambda x: x.start(), m))
31+
32+
self.__started = True
33+
self.__start_lock.release()
34+
35+
while not any([x.stopped() for x in m]):
36+
time.sleep(0.1)
37+
38+
m = getatters(runtime_context, [
39+
'transport_thread',
40+
'backend_thread'
41+
])
42+
list(map(lambda x: x.join(), m))
43+
self.__stopped = True
44+
45+
def __acquire_start_lock(self, started, err_msg):
46+
self.__start_lock.acquire()
47+
if self.__started != started:
48+
try:
49+
raise RuntimeError(err_msg)
50+
finally:
51+
self.__start_lock.release()
52+
53+
def __stop(self):
54+
runtime_context = self._runtime_context
55+
runtime_context.frontend_thread.stop()
56+
if hasattr(runtime_context, 'transport_thread'):
57+
runtime_context.transport_thread.stop()
58+
elif hasattr(runtime_context, 'backend_thread'):
59+
runtime_context.backend_thread.stop()
60+
self.__stopped = True
61+
62+
def stop(self):
63+
self.__acquire_start_lock(True, 'Can stop before start')
64+
if not self.__stopped:
65+
self.__stop()
66+
self.__start_lock.release()

src/os_3m_engine/core/frontend.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from ..common import Queue, StandardComponent
2+
from .driver import Driver
3+
from .othread import Othread
4+
5+
6+
class Frontend(StandardComponent):
7+
8+
def produce(self):
9+
raise NotImplementedError
10+
11+
12+
class FrontendDriver(Driver):
13+
14+
@property
15+
def queue(self):
16+
return self._runtime_context.frontend_thread_queue
17+
18+
def run(self):
19+
for data in self._component.produce():
20+
while True:
21+
try:
22+
self.queue.put(data, block=False, timeout=1)
23+
break
24+
except Queue.Full:
25+
continue
26+
if self._current_thread.stopping():
27+
break
28+
29+
30+
class FrontendThread(Othread):
31+
32+
def run(self):
33+
super(FrontendThread, self).run()
34+
if hasattr(self._runtime_context, 'transport_thread'):
35+
self._runtime_context.transport_thread.stop()
36+
elif hasattr(self._runtime_context, 'backend_thread'):
37+
self._runtime_context.backend_thread.stop()

0 commit comments

Comments
 (0)