5
5
6
6
from greyjack .agents .base .GJSolution import GJSolution
7
7
from greyjack .agents .base .individuals .Individual import Individual
8
- # TODO: create comparing with global top mechanism like in Rust version (by Queue/SimpleQueue for Linux, ZMQ PUB/SUB for other platforms)
9
- #from greyjack.agents.LateAcceptance import LateAcceptance
10
8
from pathos .multiprocessing import ProcessPool
11
9
from pathos .threading import ThreadPool
12
10
import multiprocessing
13
- from multiprocessing import Pipe
11
+ from multiprocessing import Pipe , SimpleQueue
14
12
from copy import deepcopy
15
13
import logging
16
14
import zmq
@@ -28,11 +26,10 @@ def __init__(self, domain_builder, cotwin_builder, agent,
28
26
initial_solution = None ):
29
27
30
28
"""
31
- "available_ports" and "default_port" are ignoring on Linux.
32
- On Windows interprocessing communication is organized by ZMQ, because of "spawn" process creation (needs to bind n_jobs + 1 ports).
33
- On Linux GreyJack uses Pipes and "fork" mechanism. That's why, you don't need to bind additional ports
34
- inside Docker container or some Linux VM (useful (and moreover needful) in production environment).
35
- Windows version of GreyJack is useful for prototyping and keeps universality of solver (keeps possibility for using on Windows).
29
+ "available_ports" and "default_port" are ignoring on Linux because of using Pipe (channels) mechanism.
30
+ For other platforms ZMQ sockets are used to keep possibility of using Solver on other platforms.
31
+ Excluding redundant ports binding is neccessary for production environments (usually docker images are using Linux distributions),
32
+ but not important for prototyping (on Windows, for example).
36
33
37
34
parallelization_backend = "threading" for debugging
38
35
parallelization_backend = "processing" for production
@@ -49,24 +46,30 @@ def __init__(self, domain_builder, cotwin_builder, agent,
49
46
self .default_port = default_port
50
47
self .initial_solution = initial_solution
51
48
49
+ self .global_top_individual = None
50
+ self .global_top_solution = None
51
+ self .variable_names = None
52
+ self .discrete_ids = None
53
+ self .is_variables_info_received = False
54
+ self .is_agent_wins_from_comparing_with_global = agent .is_win_from_comparing_with_global
52
55
self .agent_statuses = {}
53
56
self .observers = []
54
57
55
- self .is_windows = True if "win " in current_platform else False
58
+ self .is_linux = True if "linux " in current_platform else False
56
59
57
60
self ._build_logger ()
58
- if self .is_windows :
59
- self ._init_agents_available_addresses_and_ports ()
60
- else :
61
+ if self .is_linux :
61
62
self ._init_master_solver_pipe ()
63
+ else :
64
+ self ._init_agents_available_addresses_and_ports ()
62
65
63
66
64
67
def _build_logger (self ):
65
68
66
69
if self .logging_level is None :
67
70
self .logging_level = "info"
68
- if self .logging_level not in ["info" , "trace" , "warn" ]:
69
- raise Exception ("logging_level must be in [\" info\" , \" trace\" , \" warn\" ]" )
71
+ if self .logging_level not in ["info" , "trace" , "warn" , "fresh_only" ]:
72
+ raise Exception ("logging_level must be in [\" info\" , \" trace\" , \" warn\" , \" fresh_only \" ]" )
70
73
71
74
self .logger = logging .getLogger ("logger" )
72
75
self .logger .setLevel (logging .INFO )
@@ -78,7 +81,7 @@ def _build_logger(self):
78
81
pass
79
82
80
83
def _init_agents_available_addresses_and_ports (self ):
81
- minimal_ports_count_required = self .n_jobs + 1
84
+ minimal_ports_count_required = self .n_jobs + 2
82
85
if self .available_ports is not None :
83
86
available_ports_count = len (self .available_ports )
84
87
if available_ports_count < minimal_ports_count_required :
@@ -88,26 +91,35 @@ def _init_agents_available_addresses_and_ports(self):
88
91
self .available_ports = [str (int (self .default_port ) + i ) for i in range (minimal_ports_count_required )]
89
92
90
93
self .address = "localhost"
91
- self .port = self .available_ports [0 ]
92
- self .full_address = "tcp://{}:{}" .format (self .address , self .port )
94
+ self .master_subscriber_address = "tcp://{}:{}" .format (self .address , self .available_ports [0 ])
93
95
self .context = zmq .Context ()
94
- self .socket = self .context .socket (zmq .SUB )
95
- self .socket .setsockopt_string (zmq .SUBSCRIBE , "" )
96
+ self .master_to_agents_subscriber_socket = self .context .socket (zmq .SUB )
97
+ self .master_to_agents_subscriber_socket .setsockopt_string (zmq .SUBSCRIBE , "" )
96
98
# process only the most actual messages from agents (drop old messages)
97
- self .socket .setsockopt (zmq .CONFLATE , 1 )
98
- self .socket .bind ( self .full_address )
99
+ self .master_to_agents_subscriber_socket .setsockopt (zmq .CONFLATE , 1 )
100
+ self .master_to_agents_subscriber_socket .bind ( self .master_subscriber_address )
101
+
102
+ self .master_publisher_address = "tcp://{}:{}" .format (self .address , self .available_ports [1 ])
103
+ self .master_to_agents_publisher_socket = self .context .socket (zmq .PUB )
104
+ self .master_to_agents_publisher_socket .bind ( self .master_publisher_address )
99
105
100
- current_port_id = 1
101
- available_agent_ports = [self .available_ports [current_port_id + i ] for i in range (self .n_jobs )]
102
- self .available_agent_ports = available_agent_ports
103
- self .available_agent_addresses = ["localhost" for i in range (self .n_jobs )]
106
+ current_port_id = 2
107
+ available_agent_to_agent_ports = [self .available_ports [current_port_id + i ] for i in range (self .n_jobs )]
108
+ self .available_agent_to_agent_ports = available_agent_to_agent_ports
109
+ self .available_agent_to_agent_addresses = ["localhost" for i in range (self .n_jobs )]
104
110
105
111
pass
106
112
107
113
def _init_master_solver_pipe (self ):
108
- solver_updates_sender , solver_updates_receiver = Pipe ()
109
- self .solver_updates_receiver = solver_updates_receiver
110
- self .solver_updates_sender = solver_updates_sender
114
+ agent_to_master_updates_sender , master_from_agent_updates_receiver = Pipe ()
115
+ self .agent_to_master_updates_sender = agent_to_master_updates_sender
116
+ self .master_from_agent_updates_receiver = master_from_agent_updates_receiver
117
+ master_to_agent_updates_sender , agent_from_master_updates_receiver = Pipe ()
118
+ self .master_to_agent_updates_sender = master_to_agent_updates_sender
119
+ self .agent_from_master_updates_receiver = agent_from_master_updates_receiver
120
+
121
+ #self.master_publisher_queue = SimpleQueue()
122
+ #self.master_subscriber_queue = SimpleQueue()
111
123
112
124
113
125
def solve (self ):
@@ -117,37 +129,26 @@ def solve(self):
117
129
118
130
start_time = time .perf_counter ()
119
131
steps_count = 0
120
- global_top_individual = None
121
132
while True :
122
133
123
- if self .is_windows :
124
- agent_publication = self .socket .recv ()
125
- agent_publication = orjson .loads (agent_publication )
126
- else :
127
- agent_publication = self .solver_updates_receiver .recv ()
128
- agent_id = agent_publication ["agent_id" ]
129
- agent_status = agent_publication ["status" ]
130
- local_step = agent_publication ["step" ]
131
- score_variant = agent_publication ["score_variant" ]
132
- received_individual = agent_publication ["candidate" ]
133
- received_individual = Individual .get_related_individual_type_by_value (score_variant ).from_list (received_individual )
134
-
134
+ received_individual , agent_id , agent_status , local_step = self .receive_agent_publication ()
135
135
new_best_flag = False
136
- if global_top_individual is None :
137
- global_top_individual = received_individual
138
- elif received_individual < global_top_individual :
139
- global_top_individual = received_individual
136
+ if self .global_top_individual is None :
137
+ self .global_top_individual = received_individual
138
+ elif received_individual < self .global_top_individual :
139
+ self .global_top_individual = received_individual
140
+ self .update_global_top_solution ()
140
141
new_best_flag = True
142
+ self .send_global_update ()
143
+
141
144
total_time = time .perf_counter () - start_time
142
145
steps_count += 1
143
146
new_best_string = "New best score!" if new_best_flag else ""
144
- if self .logging_level == "trace" :
145
- self .logger .info (f"Solutions received: { steps_count } Best score: { global_top_individual .score } , Solving time: { total_time :.6f} , { new_best_string } , Current (agent: { agent_id } , status: { agent_status } , local_step: { local_step } ): { received_individual .score } " )
146
- elif self .logging_level == "info" :
147
- self .logger .info (f"Solutions received: { steps_count } Best score: { global_top_individual .score } , Solving time: { total_time :.6f} { new_best_string } " )
147
+ if self .logging_level == "fresh_only" and new_best_flag :
148
+ self .logger .info (f"Solutions received: { steps_count } Best score: { self .global_top_individual .score } , Solving time: { total_time :.6f} { new_best_string } " )
148
149
149
150
if len (self .observers ) >= 1 :
150
- self ._notify_observers (global_top_individual )
151
+ self ._notify_observers (self . global_top_individual )
151
152
152
153
self .agent_statuses [agent_id ] = agent_status
153
154
someone_alive = False
@@ -160,12 +161,11 @@ def solve(self):
160
161
if someone_alive is False :
161
162
agents_process_pool .terminate ()
162
163
agents_process_pool .close ()
163
- agents_process_pool .join ()
164
164
del agents_process_pool
165
165
gc .collect ()
166
166
break
167
167
168
- return global_top_individual
168
+ return self . global_top_solution
169
169
170
170
def _run_jobs (self , agents ):
171
171
def run_agent_solving (agent ):
@@ -199,43 +199,76 @@ def _setup_agents(self):
199
199
for j in range (self .n_jobs ):
200
200
agents [i ].round_robin_status_dict [agents [j ].agent_id ] = deepcopy (agents [i ].agent_status )
201
201
202
- if self .is_windows :
203
- for i in range (self .n_jobs ):
204
- agents [i ].solver_master_address = deepcopy (self .full_address )
205
- agents [i ].current_agent_address = deepcopy ("tcp://{}:{}" .format (self .available_agent_addresses [i ], self .available_agent_ports [i ]))
206
-
207
- for i in range (self .n_jobs ):
208
- next_agent_id = (i + 1 ) % self .n_jobs
209
- agents [i ].next_agent_address = deepcopy (agents [next_agent_id ].current_agent_address )
210
- else :
202
+ if self .is_linux :
211
203
agents_updates_senders = []
212
204
agents_updates_receivers = []
213
205
for i in range (self .n_jobs ):
214
- updates_sender , updates_receiver = Pipe ()
215
- agents_updates_senders .append (updates_sender )
216
- agents_updates_receivers .append (updates_receiver )
206
+ agent_to_agent_pipe_sender , agent_to_agent_pipe_receiver = Pipe ()
207
+ agents_updates_senders .append (agent_to_agent_pipe_sender )
208
+ agents_updates_receivers .append (agent_to_agent_pipe_receiver )
217
209
agents_updates_receivers .append (agents_updates_receivers .pop (0 ))
218
-
219
210
for i in range (self .n_jobs ):
220
- agents [i ].updates_sender = agents_updates_senders [i ]
221
- agents [i ].updates_receiver = agents_updates_receivers [i ]
222
- agents [i ].solver_master_sender = deepcopy (self .solver_updates_sender )
211
+ agents [i ].agent_to_agent_pipe_sender = agents_updates_senders [i ]
212
+ agents [i ].agent_to_agent_pipe_receiver = agents_updates_receivers [i ]
213
+ agents [i ].agent_to_master_updates_sender = self .agent_to_master_updates_sender
214
+ agents [i ].agent_from_master_updates_receiver = self .agent_from_master_updates_receiver
215
+ #agents[i].master_publisher_queue = self.master_publisher_queue
216
+ #agents[i].master_subscriber_queue = self.master_subscriber_queue
217
+
218
+ else :
219
+ for i in range (self .n_jobs ):
220
+ agents [i ].agent_address_for_other_agents = deepcopy ("tcp://{}:{}" .format (self .available_agent_to_agent_addresses [i ], self .available_agent_to_agent_ports [i ]))
221
+ agents [i ].master_subscriber_address = deepcopy (self .master_subscriber_address )
222
+ agents [i ].master_publisher_address = deepcopy (self .master_publisher_address )
223
+ for i in range (self .n_jobs ):
224
+ next_agent_id = (i + 1 ) % self .n_jobs
225
+ agents [i ].next_agent_address = deepcopy (agents [next_agent_id ].agent_address_for_other_agents )
223
226
224
227
return agents
225
228
226
- """def _build_gjsolution_from_individual(self, individual):
227
- variables_dict = self.score_requesters[0].variables_manager.inverse_transform_variables(individual.transformed_values)
228
- solution_score = individual.score
229
- gjsolution = GJSolution(variables_dict, solution_score)
230
- return gjsolution
229
+ def receive_agent_publication (self ):
230
+ if self .is_linux :
231
+ agent_publication = self .master_from_agent_updates_receiver .recv ()
232
+ else :
233
+ agent_publication = self .master_to_agents_subscriber_socket .recv ()
234
+ agent_publication = orjson .loads (agent_publication )
235
+ agent_id = agent_publication ["agent_id" ]
236
+ agent_status = agent_publication ["status" ]
237
+ local_step = agent_publication ["step" ]
238
+ score_variant = agent_publication ["score_variant" ]
239
+ received_individual = agent_publication ["candidate" ]
240
+ received_individual = Individual .get_related_individual_type_by_value (score_variant ).from_list (received_individual )
241
+ if not self .is_variables_info_received :
242
+ self .variable_names = agent_publication ["variable_names" ]
243
+ self .discrete_ids = agent_publication ["discrete_ids" ]
244
+ self .is_variables_info_received = True
245
+
246
+ return received_individual , agent_id , agent_status , local_step
247
+
248
+ def send_global_update (self ):
249
+
250
+ if self .is_agent_wins_from_comparing_with_global :
251
+ master_publication = [self .global_top_individual .as_list (), self .is_variables_info_received ]
252
+ else :
253
+ master_publication = [None , self .is_variables_info_received ]
254
+
255
+ if not self .is_linux :
256
+ master_publication = orjson .dumps (master_publication )
257
+ self .master_to_agents_publisher_socket .send ( master_publication )
258
+ else :
259
+ self .master_to_agent_updates_sender .send (master_publication )
260
+
261
+ def update_global_top_solution (self ):
262
+ individual_list = self .global_top_individual .as_list ()
263
+ self .global_top_solution = GJSolution (self .variable_names , self .discrete_ids , individual_list [0 ], individual_list [1 ], self .score_precision )
264
+ pass
231
265
232
266
233
267
def register_observer (self , observer ):
234
268
self .observers .append (observer )
235
269
pass
236
270
237
- def _notify_observers(self, solution_update):
238
- gjsolution = self._build_gjsolution_from_individual(solution_update)
271
+ def _notify_observers (self ):
239
272
for observer in self .observers :
240
- observer.update_solution(gjsolution )
241
- pass"""
273
+ observer .update_solution (self . global_top_solution )
274
+ pass
0 commit comments