3
3
import time
4
4
import random
5
5
6
+ from greyjack .agents .base .LoggingLevel import LoggingLevel
7
+ from greyjack .agents .base .ParallelizationBackend import ParallelizationBackend
6
8
from greyjack .agents .base .GJSolution import GJSolution
7
9
from greyjack .agents .base .individuals .Individual import Individual
8
10
from pathos .multiprocessing import ProcessPool
20
22
class Solver ():
21
23
22
24
def __init__ (self , domain_builder , cotwin_builder , agent ,
23
- n_jobs = None , parallelization_backend = "processing" ,
24
- score_precision = None , logging_level = None ,
25
+ parallelization_backend = ParallelizationBackend .Multiprocessing ,
26
+ logging_level = LoggingLevel .Info ,
27
+ n_jobs = None , score_precision = None ,
25
28
available_ports = None , default_port = "25000" ,
26
29
initial_solution = None ):
27
30
28
31
"""
29
- "available_ports" and "default_port" are ignoring on Linux because of using Pipe (channels) mechanism.
30
- For other platforms ZMQ sockets are used to keep possibility of using Solver on other platforms.
31
- Excluding redundant ports binding is neccessary for production environments (usually docker images are using Linux distributions),
32
- but not important for prototyping (on Windows, for example).
33
-
34
- parallelization_backend = "threading" for debugging
35
- parallelization_backend = "processing" for production
32
+ On Linux platform solver needs 2 ports to bind.
33
+ On other platforms n_agents + 2.
34
+ All ports are binding to localhost.
35
+
36
+ TODO:
37
+ Create and run 2 Docker containers, that will bind same internal (localhost) ports.
38
+ Will them work simultaneously or someone (both) fall down?
39
+ In theory, containers have theyr own network spaces.
40
+ If Docker can bind the same ports of localhost in different containers
41
+ for internal interprocess communication, then I don't need to care
42
+ about internally binded ports count (only about exposed on the level of API facade).
36
43
"""
37
44
38
45
self .domain_builder = domain_builder
@@ -58,6 +65,7 @@ def __init__(self, domain_builder, cotwin_builder, agent,
58
65
self .is_linux = True if "linux" in current_platform else False
59
66
60
67
self ._build_logger ()
68
+ self ._init_master_pub_sub ()
61
69
if self .is_linux :
62
70
self ._init_master_solver_pipe ()
63
71
else :
@@ -67,9 +75,9 @@ def __init__(self, domain_builder, cotwin_builder, agent,
67
75
def _build_logger (self ):
68
76
69
77
if self .logging_level is None :
70
- self .logging_level = "info"
71
- if self .logging_level not in ["info" , "trace" , "warn" , "fresh_only" ]:
72
- raise Exception ("logging_level must be in [ \" info \" , \" trace \" , \" warn \" , \" fresh_only \" ] " )
78
+ self .logging_level = LoggingLevel . Info
79
+ if self .logging_level not in [LoggingLevel . FreshOnly , LoggingLevel . Info , LoggingLevel . Warn ]:
80
+ raise Exception ("logging_level must be value of LoggingLevel enum from greyjack.agents.base module " )
73
81
74
82
self .logger = logging .getLogger ("logger" )
75
83
self .logger .setLevel (logging .INFO )
@@ -80,14 +88,17 @@ def _build_logger(self):
80
88
81
89
pass
82
90
83
- def _init_agents_available_addresses_and_ports (self ):
84
- minimal_ports_count_required = self .n_jobs + 2
91
+ def _init_master_pub_sub (self ):
92
+
93
+ minimal_ports_count_required = 2
85
94
if self .available_ports is not None :
86
95
available_ports_count = len (self .available_ports )
87
96
if available_ports_count < minimal_ports_count_required :
88
- exception_string = "For {} agents required at least {} available ports. Set available_ports list manually or set it None for auto allocation" .format (self .n_jobs , minimal_ports_count_required )
97
+ exception_string = "Required at least {} available ports for master node to share global state between agents . Set available_ports list manually or set it None for auto allocation" .format (self .n_jobs , minimal_ports_count_required )
89
98
raise Exception (exception_string )
90
99
else :
100
+ if not self .is_linux :
101
+ minimal_ports_count_required += self .n_jobs
91
102
self .available_ports = [str (int (self .default_port ) + i ) for i in range (minimal_ports_count_required )]
92
103
93
104
self .address = "localhost"
@@ -103,6 +114,17 @@ def _init_agents_available_addresses_and_ports(self):
103
114
self .master_to_agents_publisher_socket = self .context .socket (zmq .PUB )
104
115
self .master_to_agents_publisher_socket .bind ( self .master_publisher_address )
105
116
117
+ def _init_agents_available_addresses_and_ports (self ):
118
+
119
+ minimal_ports_count_required = self .n_jobs + 2 # including master ports
120
+ if self .available_ports is not None :
121
+ available_ports_count = len (self .available_ports )
122
+ if available_ports_count < minimal_ports_count_required :
123
+ exception_string = "For {} agents required at least {} available ports. Set available_ports list manually or set it None for auto allocation" .format (self .n_jobs , minimal_ports_count_required )
124
+ raise Exception (exception_string )
125
+ else :
126
+ self .available_ports = [str (int (self .default_port ) + i ) for i in range (minimal_ports_count_required )]
127
+
106
128
current_port_id = 2
107
129
available_agent_to_agent_ports = [self .available_ports [current_port_id + i ] for i in range (self .n_jobs )]
108
130
self .available_agent_to_agent_ports = available_agent_to_agent_ports
@@ -118,9 +140,6 @@ def _init_master_solver_pipe(self):
118
140
self .master_to_agent_updates_sender = master_to_agent_updates_sender
119
141
self .agent_from_master_updates_receiver = agent_from_master_updates_receiver
120
142
121
- #self.master_publisher_queue = SimpleQueue()
122
- #self.master_subscriber_queue = SimpleQueue()
123
-
124
143
125
144
def solve (self ):
126
145
@@ -144,7 +163,7 @@ def solve(self):
144
163
total_time = time .perf_counter () - start_time
145
164
steps_count += 1
146
165
new_best_string = "New best score!" if new_best_flag else ""
147
- if self .logging_level == "fresh_only" and new_best_flag :
166
+ if self .logging_level == LoggingLevel . FreshOnly and new_best_flag :
148
167
self .logger .info (f"Solutions received: { steps_count } Best score: { self .global_top_individual .score } , Solving time: { total_time :.6f} { new_best_string } " )
149
168
150
169
if len (self .observers ) >= 1 :
@@ -171,12 +190,12 @@ def _run_jobs(self, agents):
171
190
def run_agent_solving (agent ):
172
191
agent .solve ()
173
192
174
- if self .parallelization_backend == "threading" :
193
+ if self .parallelization_backend == ParallelizationBackend . Threading :
175
194
agents_process_pool = ThreadPool (id = "agents_pool" )
176
- elif self .parallelization_backend == "processing" :
195
+ elif self .parallelization_backend == ParallelizationBackend . Multiprocessing :
177
196
agents_process_pool = ProcessPool (id = "agents_pool" )
178
197
else :
179
- raise Exception ("parallelization_backend can be only \" threading \" (for debugging) or \" processing \" (for production) " )
198
+ raise Exception ("parallelization_backend must be value of enum ParallelizationBackend from greyjack.agents.base module " )
180
199
agents_process_pool .ncpus = self .n_jobs
181
200
agents_process_pool .imap (run_agent_solving , agents )
182
201
@@ -199,6 +218,10 @@ def _setup_agents(self):
199
218
for j in range (self .n_jobs ):
200
219
agents [i ].round_robin_status_dict [agents [j ].agent_id ] = deepcopy (agents [i ].agent_status )
201
220
221
+ for i in range (self .n_jobs ):
222
+ agents [i ].master_subscriber_address = deepcopy (self .master_subscriber_address )
223
+ agents [i ].master_publisher_address = deepcopy (self .master_publisher_address )
224
+
202
225
if self .is_linux :
203
226
agents_updates_senders = []
204
227
agents_updates_receivers = []
@@ -210,28 +233,19 @@ def _setup_agents(self):
210
233
for i in range (self .n_jobs ):
211
234
agents [i ].agent_to_agent_pipe_sender = agents_updates_senders [i ]
212
235
agents [i ].agent_to_agent_pipe_receiver = agents_updates_receivers [i ]
213
- agents [i ].agent_to_master_updates_sender = self .agent_to_master_updates_sender
214
- agents [i ].agent_from_master_updates_receiver = self .agent_from_master_updates_receiver
215
- #agents[i].master_publisher_queue = self.master_publisher_queue
216
- #agents[i].master_subscriber_queue = self.master_subscriber_queue
217
-
218
236
else :
219
237
for i in range (self .n_jobs ):
220
238
agents [i ].agent_address_for_other_agents = deepcopy ("tcp://{}:{}" .format (self .available_agent_to_agent_addresses [i ], self .available_agent_to_agent_ports [i ]))
221
- agents [i ].master_subscriber_address = deepcopy (self .master_subscriber_address )
222
- agents [i ].master_publisher_address = deepcopy (self .master_publisher_address )
223
239
for i in range (self .n_jobs ):
224
240
next_agent_id = (i + 1 ) % self .n_jobs
225
241
agents [i ].next_agent_address = deepcopy (agents [next_agent_id ].agent_address_for_other_agents )
226
242
227
243
return agents
228
244
229
245
def receive_agent_publication (self ):
230
- if self .is_linux :
231
- agent_publication = self .master_from_agent_updates_receiver .recv ()
232
- else :
233
- agent_publication = self .master_to_agents_subscriber_socket .recv ()
234
- agent_publication = orjson .loads (agent_publication )
246
+
247
+ agent_publication = self .master_to_agents_subscriber_socket .recv ()
248
+ agent_publication = orjson .loads (agent_publication )
235
249
agent_id = agent_publication ["agent_id" ]
236
250
agent_status = agent_publication ["status" ]
237
251
local_step = agent_publication ["step" ]
@@ -252,11 +266,8 @@ def send_global_update(self):
252
266
else :
253
267
master_publication = [None , self .is_variables_info_received ]
254
268
255
- if not self .is_linux :
256
- master_publication = orjson .dumps (master_publication )
257
- self .master_to_agents_publisher_socket .send ( master_publication )
258
- else :
259
- self .master_to_agent_updates_sender .send (master_publication )
269
+ master_publication = orjson .dumps (master_publication )
270
+ self .master_to_agents_publisher_socket .send ( master_publication )
260
271
261
272
def update_global_top_solution (self ):
262
273
individual_list = self .global_top_individual .as_list ()
0 commit comments