Skip to content

Commit 50efd6e

Browse files
committed
add: refactoring code struct
1 parent ba0bfd1 commit 50efd6e

File tree

14 files changed

+377
-10
lines changed

14 files changed

+377
-10
lines changed

MANIFEST.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
include LICENSE
22
include README.md
33
include MANIFEST.in
4-
include src/os_m3_engine/VERSION
4+
include src/os_3m_engine/VERSION
55
recursive-include tests *.py
66

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
# os-m3-engine
1+
# os-3m-engine
22

3-
[![Build Status](https://www.travis-ci.org/cfhamlet/os-m3-engine.svg?branch=master)](https://www.travis-ci.org/cfhamlet/os-m3-engine)
4-
[![codecov](https://codecov.io/gh/cfhamlet/os-m3-engine/branch/master/graph/badge.svg)](https://codecov.io/gh/cfhamlet/os-m3-engine)
5-
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/os-m3-engine.svg)](https://pypi.python.org/pypi/os-m3-engine)
6-
[![PyPI](https://img.shields.io/pypi/v/os-m3-engine.svg)](https://pypi.python.org/pypi/os-m3-engine)
3+
[![Build Status](https://www.travis-ci.org/cfhamlet/os-3m-engine.svg?branch=master)](https://www.travis-ci.org/cfhamlet/os-3m-engine)
4+
[![codecov](https://codecov.io/gh/cfhamlet/os-3m-engine/branch/master/graph/badge.svg)](https://codecov.io/gh/cfhamlet/os-3m-engine)
5+
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/os-3m-engine.svg)](https://pypi.python.org/pypi/os-3m-engine)
6+
[![PyPI](https://img.shields.io/pypi/v/os-3m-engine.svg)](https://pypi.python.org/pypi/os-3m-engine)
77

88

99
Multithread engine for 3(or 2) stage job.
1010

1111

1212
# Install
1313

14-
`pip install os-m3-engine`
14+
`pip install os-3m-engine`
1515

1616
# Usage
1717

setup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ def read(*filenames, **kwargs):
1414

1515

1616
setup(
17-
name='os-m3-engine',
18-
version=read('src/os_m3_engine/VERSION'),
17+
name='os-3m-engine',
18+
version=read('src/os_3m_engine/VERSION'),
1919
packages=find_packages(where='src'),
2020
package_dir={'': 'src'},
2121
include_package_data=True,
@@ -24,7 +24,7 @@ def read(*filenames, **kwargs):
2424
long_description=open('README.md').read(),
2525
author='Ozzy',
2626
author_email='cfhamlet@gmail.com',
27-
url='https://github.com/cfhamlet/os-m3-engine',
27+
url='https://github.com/cfhamlet/os-3m-engine',
2828
zip_safe=False,
2929
classifiers=[
3030
'Development Status :: 2 - Pre-Alpha',
File renamed without changes.

src/os_3m_engine/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import sys
2+
3+
__all__ = ['__version__', 'version_info']
4+
5+
import pkgutil
6+
__version__ = pkgutil.get_data(__package__, 'VERSION').decode('ascii').strip()
7+
version_info = tuple(int(v) if v.isdigit() else v
8+
for v in __version__.split('.'))
9+
del pkgutil

src/os_3m_engine/backend.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from .common import Queue
2+
from .driver import Driver
3+
from .othread import Othread
4+
5+
6+
class BackendDriver(Driver):
7+
def __init__(self, config, current_thread):
8+
super(BackendDriver, self).__init__(config, current_thread)
9+
self._transport_exists = False
10+
if hasattr(self.runtime_context, 'transport_thread'):
11+
self._transport_exists = True
12+
self._queue = self.runtime_context.backend_thread_queue
13+
else:
14+
self._queue = self.runtime_context.frontend_thread_queue
15+
16+
self._processor = config.engine.backend.processor_cls(config)
17+
18+
@property
19+
def queue(self):
20+
return self._queue
21+
22+
def run(self):
23+
while True:
24+
if self.current_thread.stopping():
25+
if self.queue.qsize() <= 0:
26+
if not self._transport_exists:
27+
break
28+
elif self.runtime_context.transport_thread.stopped():
29+
break
30+
31+
try:
32+
data = self.queue.get(block=True, timeout=0.1)
33+
self._processor.process(data)
34+
except Queue.Empty:
35+
pass
36+
37+
38+
class BackendThread(Othread):
39+
40+
def run(self):
41+
super(BackendThread, self).run()
42+
self.runtime_context.frontend_thread.stop()
43+
if hasattr(self.runtime_context, 'transport_thread'):
44+
self.runtime_context.transport_thread.stop()
45+

src/os_3m_engine/common.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
2+
import sys
3+
_PY3 = sys.version_info[0] == 3
4+
5+
if _PY3:
6+
import queue as Queue
7+
else:
8+
import Queue
9+
10+
11+
class Configurable(object):
12+
def __init__(self, config):
13+
self._config = config
14+
15+
@property
16+
def config(self):
17+
return self._config
18+
19+
20+
class RuntimeContext(object):
21+
pass

src/os_3m_engine/driver.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from .common import Configurable
2+
3+
4+
class Driver(Configurable):
5+
6+
def __init__(self, config, current_thread):
7+
super(Driver, self).__init__(config)
8+
self._current_thead = current_thread
9+
10+
@property
11+
def current_thread(self):
12+
return self._current_thead
13+
14+
@property
15+
def runtime_context(self):
16+
return self.config.runtime_context
17+
18+
def run(self):
19+
raise NotImplementedError

src/os_3m_engine/engine.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import logging
2+
import signal
3+
import sys
4+
import threading
5+
import time
6+
from .common import Configurable, RuntimeContext, Queue
7+
from .frontend import FrontendThread
8+
from .transport import TransportThread
9+
from .backend import BackendThread
10+
from .othread import MultithreadManager
11+
from .utils import getatters
12+
13+
14+
class MultithreadEngine(Configurable):
15+
16+
def __init__(self, config):
17+
super(MultithreadEngine, self).__init__(config)
18+
self.__ensure()
19+
self.__start_lock = threading.Lock()
20+
self.__started = False
21+
self.__stopped = False
22+
self._runtime_context = RuntimeContext()
23+
24+
def __ensure(self):
25+
engine_config = self.config.engine
26+
assert any([hasattr(engine_config, x)
27+
for x in ('transport', 'backend')])
28+
29+
def __setup(self):
30+
runtime_context = self._runtime_context
31+
engine_config = self.config.engine
32+
33+
runtime_context.frontend_thread_queue = Queue.Queue(
34+
engine_config.frontend.queue_size)
35+
36+
runtime_context.frontend_thread = MultithreadManager(
37+
self.config, runtime_context,
38+
FrontendThread, engine_config.frontend.thread_num,
39+
engine_config.frontend.driver_cls)
40+
runtime_context.frontend_thread.setDaemon(True)
41+
42+
if hasattr(engine_config, 'transport'):
43+
44+
if hasattr(engine_config, 'backend'):
45+
runtime_context.backend_thread_queue = Queue.Queue(
46+
engine_config.backend.queue_size)
47+
48+
runtime_context.transport_thread = MultithreadManager(
49+
self.config, runtime_context,
50+
TransportThread, engine_config.transport.thread_num,
51+
engine_config.transport.driver_cls)
52+
53+
if hasattr(engine_config, 'backend'):
54+
runtime_context.backend_thread = MultithreadManager(
55+
self.config, runtime_context,
56+
BackendThread, engine_config.backend.thread_num,
57+
engine_config.backend.driver_cls)
58+
59+
def __start(self):
60+
61+
runtime_context = self._runtime_context
62+
m = getatters(runtime_context, [
63+
'backend_thread',
64+
'transport_thread',
65+
'frontend_thread'
66+
])
67+
list(map(lambda x: x.start(), m))
68+
69+
self.__started = True
70+
self.__start_lock.release()
71+
72+
while not any([x.stopped() for x in m]):
73+
time.sleep(0.1)
74+
75+
m = getatters(runtime_context, [
76+
'transport_thread',
77+
'backend_thread'
78+
])
79+
list(map(lambda x: x.join(), m))
80+
self.__stopped = True
81+
82+
def __acquire_start_lock(self, started, err_msg):
83+
self.__start_lock.acquire()
84+
if self.__started != started:
85+
try:
86+
raise RuntimeError(err_msg)
87+
finally:
88+
self.__start_lock.release()
89+
90+
def start(self):
91+
self.__acquire_start_lock(False, 'Can not start twice')
92+
self.__setup()
93+
self.__start()
94+
95+
def __stop(self):
96+
runtime_context = self._runtime_context
97+
runtime_context.frontend_thread.stop()
98+
if hasattr(runtime_context, 'transport_thread'):
99+
runtime_context.transport_thread.stop()
100+
elif hasattr(runtime_context, 'backend_thread'):
101+
runtime_context.backend_thread.stop()
102+
self.__stopped = True
103+
104+
def stop(self):
105+
self.__acquire_start_lock(True, 'Can stop before start')
106+
if not self.__stopped:
107+
self.__stop()
108+
self.__start_lock.release()

src/os_3m_engine/frontend.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from .driver import Driver
2+
from .common import Queue
3+
from .othread import Othread
4+
5+
6+
class FrontendDriver(Driver):
7+
def __init__(self, config, current_thread):
8+
super(FrontendDriver, self).__init__(config, current_thread)
9+
self._processor = self.config.engine.frontend.processor_cls(config)
10+
11+
@property
12+
def queue(self):
13+
return self.runtime_context.frontend_thread_queue
14+
15+
def run(self):
16+
for data in self._processor.process():
17+
data = data.strip()
18+
while True:
19+
if not data:
20+
if self.current_thread.stopping():
21+
return
22+
else:
23+
break
24+
try:
25+
self.queue.put(data, block=False, timeout=1)
26+
break
27+
except Queue.Full:
28+
continue
29+
if self.current_thread.stopping():
30+
break
31+
32+
33+
class FrontendThread(Othread):
34+
35+
def run(self):
36+
super(FrontendThread, self).run()
37+
if hasattr(self.runtime_context, 'transport_thread'):
38+
self.runtime_context.transport_thread.stop()
39+
elif hasattr(self.runtime_context, 'backend_thread'):
40+
self.runtime_context.backend_thread.stop()

0 commit comments

Comments
 (0)