Skip to content

Commit ac81121

Browse files
committed
add: auto tune queue size
1 parent 269d8cd commit ac81121

File tree

1 file changed

+64
-35
lines changed

1 file changed

+64
-35
lines changed

src/os_3m_engine/launcher.py

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,26 @@
55
from .core.othread import OthreadManager
66
from .utils import load_class
77

8-
FRONTEND_ENGINE_CONFIG = Config.create(
8+
ENGINE_FRONTEND_CONFIG = Config.create(
99
thread_cls='os_3m_engine.core.frontend.FrontendThread',
1010
thread_num=1,
1111
driver_cls='os_3m_engine.core.frontend.FrontendDriver',
1212
component_factory_cls='os_3m_engine.common.ComponentFactory',
1313
queue_size=100,
1414
)
15-
TRANSPORT_ENGINE_CONFIG = Config.create(
15+
16+
ENGINE_TRANSPORT_CONFIG = Config.create(
1617
thread_cls='os_3m_engine.core.transport.TransportThread',
1718
thread_num=3,
1819
driver_cls='os_3m_engine.core.transport.TransportDriver',
1920
component_factory_cls='os_3m_engine.common.ComponentFactory',
2021
)
2122

22-
TRANSPORT_BRIDGE_ENGINE_CONFIG = Config.create()
23-
TRANSPORT_BRIDGE_ENGINE_CONFIG.update(TRANSPORT_ENGINE_CONFIG)
24-
TRANSPORT_BRIDGE_ENGINE_CONFIG.driver_cls = 'os_3m_engine.core.transport.BridgeDriver'
23+
ENGINE_TRANSPORT_BRIDGE_CONFIG = Config.create()
24+
ENGINE_TRANSPORT_BRIDGE_CONFIG.update(ENGINE_TRANSPORT_CONFIG)
25+
ENGINE_TRANSPORT_BRIDGE_CONFIG.driver_cls = 'os_3m_engine.core.transport.BridgeDriver'
2526

26-
BACKEND_ENGINE_CONFIG = Config.create(
27+
ENGINE_BACKEND_CONFIG = Config.create(
2728
thread_cls='os_3m_engine.core.backend.BackendThread',
2829
thread_num=1,
2930
driver_cls='os_3m_engine.core.backend.BackendDriver',
@@ -36,7 +37,16 @@
3637
)
3738

3839

39-
def combine_from_default_config(default_config, custom_config):
40+
def _queue_size(custorm_config, default_config, thread_num):
41+
queue_size = custorm_config.queue_size if hasattr(
42+
custorm_config, 'queue_size') else default_config.queue_size
43+
if custorm_config == default_config \
44+
or not hasattr(custorm_config, 'queue_size'):
45+
queue_size = max(queue_size, thread_num*2)
46+
return queue_size
47+
48+
49+
def combine_with_default_config(default_config, custom_config):
4050
c = Config.create()
4151
c.update(default_config)
4252
if custom_config is not None:
@@ -51,9 +61,9 @@ def create(frontend_cls='os_3m_engine.ootb.StdinFrontend',
5161
backend_cls='os_3m_engine.ootb.LogBackend',
5262
app_config=None,
5363
engine_config=ENGINE_CONFIG,
54-
frontend_engine_config=FRONTEND_ENGINE_CONFIG,
55-
transport_engine_config=TRANSPORT_BRIDGE_ENGINE_CONFIG,
56-
backend_engine_config=BACKEND_ENGINE_CONFIG,
64+
engine_frontend_config=ENGINE_FRONTEND_CONFIG,
65+
engine_transport_config=ENGINE_TRANSPORT_BRIDGE_CONFIG,
66+
engine_backend_config=ENGINE_BACKEND_CONFIG,
5767
runtime_context=None):
5868

5969
if frontend_cls is None:
@@ -65,50 +75,69 @@ def create(frontend_cls='os_3m_engine.ootb.StdinFrontend',
6575
runtime_context = runtime_context if runtime_context is not None else RuntimeContext()
6676

6777
# init frontend
68-
frontend_engine_config = combine_from_default_config(
69-
FRONTEND_ENGINE_CONFIG, frontend_engine_config)
70-
71-
runtime_context.frontend_thread_queue = Queue.Queue(
72-
frontend_engine_config.queue_size)
78+
e_frontend_config = combine_with_default_config(
79+
ENGINE_FRONTEND_CONFIG, engine_frontend_config)
7380

7481
frontend_factory_cls = load_class(
75-
frontend_engine_config.component_factory_cls, ConfigurableFactory)
82+
e_frontend_config.component_factory_cls, ConfigurableFactory)
7683
frontend_factory = frontend_factory_cls(app_config, frontend_cls)
7784
runtime_context.frontend_thread = OthreadManager(
78-
frontend_engine_config, runtime_context, frontend_factory)
85+
e_frontend_config, runtime_context, frontend_factory)
7986
runtime_context.frontend_thread.setDaemon(True)
8087

8188
# init transport
82-
default_transport_engine_config = TRANSPORT_BRIDGE_ENGINE_CONFIG \
83-
if backend_cls is not None else TRANSPORT_ENGINE_CONFIG
89+
default_engine_transport_config = ENGINE_TRANSPORT_BRIDGE_CONFIG \
90+
if backend_cls is not None else ENGINE_TRANSPORT_CONFIG
8491

85-
if transport_engine_config in (TRANSPORT_ENGINE_CONFIG, TRANSPORT_BRIDGE_ENGINE_CONFIG):
86-
transport_engine_config = None
92+
e_transport_config = engine_backend_config
93+
if engine_transport_config in (ENGINE_TRANSPORT_CONFIG, ENGINE_TRANSPORT_BRIDGE_CONFIG):
94+
e_transport_config = None
8795

88-
transport_engine_config = combine_from_default_config(
89-
default_transport_engine_config, transport_engine_config)
96+
e_transport_config = combine_with_default_config(
97+
default_engine_transport_config, e_transport_config)
9098

91-
backend_engine_config = combine_from_default_config(
92-
BACKEND_ENGINE_CONFIG, backend_engine_config)
99+
e_backend_config = combine_with_default_config(
100+
ENGINE_BACKEND_CONFIG, engine_backend_config)
93101

94102
if transport_cls:
103+
queue_size = _queue_size(
104+
engine_frontend_config,
105+
ENGINE_FRONTEND_CONFIG,
106+
e_transport_config.thread_num)
107+
e_frontend_config.queue_size = queue_size
108+
runtime_context.frontend_thread_queue = Queue.Queue(queue_size)
109+
95110
if backend_cls:
96-
runtime_context.backend_thread_queue = Queue.Queue(
97-
backend_engine_config.queue_size)
111+
queue_size = _queue_size(
112+
engine_backend_config,
113+
ENGINE_BACKEND_CONFIG,
114+
e_backend_config.thread_num)
115+
queue_size = max(queue_size, e_frontend_config.queue_size)
116+
e_backend_config.queue_size = queue_size
117+
runtime_context.backend_thread_queue = Queue.Queue(queue_size)
98118

99119
transport_factory_cls = load_class(
100-
transport_engine_config.component_factory_cls, ConfigurableFactory)
120+
e_transport_config.component_factory_cls, ConfigurableFactory)
101121
transport_factory = transport_factory_cls(app_config, transport_cls)
102122
runtime_context.transport_thread = OthreadManager(
103-
transport_engine_config, runtime_context, transport_factory)
104-
105-
# init backend
123+
e_transport_config, runtime_context, transport_factory)
124+
else:
125+
queue_size = _queue_size(
126+
engine_frontend_config,
127+
ENGINE_FRONTEND_CONFIG,
128+
e_backend_config.thread_num)
129+
130+
e_frontend_config.queue_size = queue_size
131+
e_backend_config.queue_size = queue_size
132+
runtime_context.frontend_thread_queue = Queue.Queue(queue_size)
133+
134+
# init backend
106135
if backend_cls:
107136
backend_factory_cls = load_class(
108-
backend_engine_config.component_factory_cls, ConfigurableFactory)
137+
e_backend_config.component_factory_cls, ConfigurableFactory)
109138
backend_factory = backend_factory_cls(app_config, backend_cls)
110139
runtime_context.backend_thread = OthreadManager(
111-
backend_engine_config, runtime_context, backend_factory)
140+
e_backend_config, runtime_context, backend_factory)
112141

113-
engine_config = combine_from_default_config(ENGINE_CONFIG, engine_config)
114-
return Engine(engine_config, runtime_context)
142+
e_config = combine_with_default_config(ENGINE_CONFIG, engine_config)
143+
return Engine(e_config, runtime_context)

0 commit comments

Comments
 (0)