Skip to content

Commit df0f9d9

Browse files
committed
fixed leak, some refactoring, more debug logs
1 parent 93d6951 commit df0f9d9

File tree

4 files changed

+34
-19
lines changed

4 files changed

+34
-19
lines changed

main.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,21 @@
1111

1212

1313
if __name__ == "__main__":
14+
asyncio_logger = logging.getLogger("asyncio")
15+
asyncio_logger_file_handler = logging.FileHandler("logs/asyncio.log")
16+
asyncio_logger_file_handler.setLevel(logging.DEBUG)
17+
asyncio_logger_file_handler.setFormatter(
18+
logging.Formatter(
19+
"%(levelname)s ~ %(asctime)s ~ %(funcName)30s() - %(message)s"
20+
)
21+
)
22+
asyncio_logger.addHandler(asyncio_logger_file_handler)
23+
24+
if settings.DEBUG:
25+
asyncio.get_event_loop().set_debug(True)
26+
27+
asyncio_logger.setLevel(logging.DEBUG)
28+
1429
main_logger = logging.getLogger("proxy_py/main")
1530

1631
if settings.DEBUG:
@@ -29,7 +44,6 @@
2944
main_logger.addHandler(logger_file_handler)
3045

3146
loop = asyncio.get_event_loop()
32-
# TODO: consider loop.set_debug
3347

3448
proxy_processor = Processor.get_instance()
3549

processor.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import logging
1111
import peewee
1212

13+
1314
# TODO: add ipv6 addresses, make domain checking better
1415
_0_TO_255_REGEX = r"([0-9]|[1-8][0-9]|9[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])"
1516
DOMAIN_LETTER_REGEX = r"[a-zA-Z0-9_\-]"
@@ -77,14 +78,14 @@ async def worker(self):
7778

7879
async def process_proxies(self):
7980
while True:
80-
await asyncio.sleep(0.00001)
81+
await asyncio.sleep(0.01)
8182
try:
8283
# check good proxies
8384
proxies = await db.execute(
8485
Proxy.select().where(
8586
Proxy.number_of_bad_checks == 0,
8687
Proxy.last_check_time < time.time() - Proxy.checking_period,
87-
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
88+
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
8889
)
8990
if proxies:
9091
self.good_proxies_are_processed = False
@@ -131,9 +132,8 @@ async def process_proxies(self):
131132

132133
async def process_collectors(self):
133134
while True:
135+
await asyncio.sleep(0.1)
134136
try:
135-
await asyncio.sleep(0.000001)
136-
137137
# check collectors
138138
collector_states = await db.execute(
139139
CollectorState.select().where(
@@ -155,16 +155,14 @@ async def process_collectors(self):
155155
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
156156

157157
async def add_proxy_to_queue(self, proxy: Proxy, collector_id=None):
158-
while self.proxies_semaphore.locked():
159-
await asyncio.sleep(0.001)
160-
161-
asyncio.ensure_future(self.process_proxy(
162-
proxy.get_raw_protocol(),
163-
proxy.auth_data,
164-
proxy.domain,
165-
proxy.port,
166-
collector_id,
167-
))
158+
async with self.proxies_semaphore:
159+
asyncio.ensure_future(self.process_proxy(
160+
proxy.get_raw_protocol(),
161+
proxy.auth_data,
162+
proxy.domain,
163+
proxy.port,
164+
collector_id,
165+
))
168166

169167
async def add_proxies_to_queue(self, proxies: list):
170168
for proxy in proxies:
@@ -258,15 +256,16 @@ async def process_raw_proxy(self, proxy, collector_id):
258256

259257
for raw_protocol in range(len(Proxy.PROTOCOLS)):
260258
while not self.good_proxies_are_processed:
261-
await asyncio.sleep(0.01)
259+
# TODO: find a better way
260+
await asyncio.sleep(0.1)
262261

263262
new_proxy = Proxy()
264263
new_proxy.raw_protocol = raw_protocol
265264
new_proxy.auth_data = auth_data
266265
new_proxy.domain = domain
267266
new_proxy.port = port
268267

269-
self.add_proxy_to_queue(new_proxy, collector_id)
268+
await self.add_proxy_to_queue(new_proxy, collector_id)
270269

271270
async def process_proxy(self, raw_protocol: int, auth_data: str, domain: str, port: int, collector_id):
272271
async with self.proxies_semaphore:

proxy_py/_settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
# 'local/collectors', # use to add your own collectors
3333
]
3434

35-
NUMBER_OF_CONCURRENT_TASKS = 128
35+
NUMBER_OF_CONCURRENT_TASKS = 64
3636
# makes aiohttp to not send more
3737
# than this number of simultaneous requests
3838
# works by common connector

proxy_utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import copy
2+
import random
3+
import asyncio
24

35
from checkers.base_checker import CheckerResult
46
from proxy_py import settings
5-
import random
67

78

89
async def check_proxy(proxy_url: str, timeout=None) -> tuple:
@@ -14,6 +15,7 @@ async def check_proxy(proxy_url: str, timeout=None) -> tuple:
1415
results = []
1516

1617
for checker, _ in zip(checkers, range(settings.MINIMUM_NUMBER_OF_CHECKERS_PER_PROXY)):
18+
checker()
1719
result = await checker().check(proxy_url, timeout=timeout)
1820
if not result[0]:
1921
return False, None

0 commit comments

Comments
 (0)