@@ -188,29 +188,69 @@ async def process_collector_of_state(self, collector_state):
188
188
# TODO: save new proxies count
189
189
190
190
async def process_raw_proxies (self , proxies , collector_id ):
191
+ tasks = []
192
+
191
193
for proxy in proxies :
192
- self .logger .debug ("adding raw proxy \" {}\" to queue" .format (proxy ))
193
- matches = re .match (PROXY_VALIDATE_REGEX , proxy )
194
- if matches :
195
- matches = matches .groupdict ()
196
- auth_data = matches ["auth_data" ]
197
- domain = matches ["domain" ]
198
- port = matches ["port" ]
199
-
200
- if auth_data is None :
201
- auth_data = ""
202
-
203
- if domain is None or port is None :
204
- raise Exception ("Bad raw proxy \" {}\" from collector \" {}\" " .format (proxy , collector_id ))
205
-
206
- for raw_protocol in range (len (Proxy .PROTOCOLS )):
207
- await self .queue .put ((
208
- raw_protocol ,
209
- auth_data ,
210
- domain ,
211
- port ,
212
- collector_id ,
213
- ))
194
+ tasks .append (self .process_raw_proxy (proxy , collector_id ))
195
+ if len (tasks ) > settings .CONCURRENT_TASKS_COUNT :
196
+ await asyncio .wait (tasks )
197
+ tasks .clear ()
198
+
199
+ if tasks :
200
+ await asyncio .wait (tasks )
201
+
202
+ async def process_raw_proxy (self , proxy , collector_id ):
203
+ self .logger .debug ("adding raw proxy \" {}\" to queue" .format (proxy ))
204
+
205
+ matches = re .match (PROXY_VALIDATE_REGEX , proxy )
206
+ if matches :
207
+ matches = matches .groupdict ()
208
+ auth_data = matches ["auth_data" ]
209
+ domain = matches ["domain" ]
210
+ port = matches ["port" ]
211
+
212
+ if auth_data is None :
213
+ auth_data = ""
214
+
215
+ if domain is None or port is None :
216
+ self .collectors_logger .error (
217
+ "Bad raw proxy \" {}\" from collector \" {}\" " .format (proxy , collector_id )
218
+ )
219
+ return
220
+
221
+ # don't care about protocol
222
+ try :
223
+ proxy = await db .get (
224
+ Proxy .select ().where (
225
+ Proxy .auth_data == auth_data ,
226
+ Proxy .domain == domain ,
227
+ Proxy .port == port ,
228
+ )
229
+ )
230
+
231
+ if proxy .last_check_time + settings .PROXY_NOT_CHECKING_PERIOD >= time .time ():
232
+ proxy_short_address = ""
233
+ if auth_data :
234
+ proxy_short_address += "@" + auth_data
235
+
236
+ proxy_short_address += "{}:{}" .format (domain , port )
237
+
238
+ self .logger .debug (
239
+ "skip proxy \" {}\" from collector \" {}\" " .format (
240
+ proxy_short_address , collector_id )
241
+ )
242
+ return
243
+ except Proxy .DoesNotExist :
244
+ pass
245
+
246
+ for raw_protocol in range (len (Proxy .PROTOCOLS )):
247
+ await self .queue .put ((
248
+ raw_protocol ,
249
+ auth_data ,
250
+ domain ,
251
+ port ,
252
+ collector_id ,
253
+ ))
214
254
215
255
async def process_proxy (self , raw_protocol : int , auth_data : str , domain : str , port : int , collector_id : int ):
216
256
self .logger .debug (
0 commit comments