Skip to content

Commit 83cb1d1

Browse files
committed
fix too many open files
1 parent 74ad187 commit 83cb1d1

File tree

4 files changed

+47
-29
lines changed

4 files changed

+47
-29
lines changed

async_requests.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ async def request(method, url, **kwargs):
3131

3232
# headers={'User-Agent': get_random_user_agent()}
3333
if 'headers' not in kwargs:
34-
kwargs['headers'] = {'User-Agent': get_random_user_agent()}
34+
kwargs['headers'] = {
35+
'User-Agent': get_random_user_agent()
36+
}
3537
elif 'User-Agent' not in kwargs['headers']:
3638
kwargs['headers']['User-Agent'] = get_random_user_agent()
3739

processor.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import aiohttp
2+
13
import collectors_list
24
import proxy_utils
35

@@ -7,13 +9,16 @@
79
from models import session
810
from models import get_or_create
911

12+
1013
import sqlalchemy
1114
import sqlalchemy.exc
1215
import asyncio
1316
import time
1417
import re
1518
import logging
1619

20+
from aiosocks.connector import ProxyConnector, ProxyClientRequest
21+
1722

1823
# TODO: add ipv6 addresses
1924
PROXY_VALIDATE_REGEX = r"^((?P<protocol>(http|socks4|socks5))://)?" \
@@ -72,14 +77,18 @@ async def consumer(self):
7277
while True:
7378
await asyncio.sleep(0.1)
7479
i = 0
75-
while not self.queue.empty() and i <= settings.CONCURRENT_TASKS_COUNT:
76-
proxy_data = self.queue.get_nowait()
77-
tasks.append(self.process_proxy(*proxy_data))
78-
self.queue.task_done()
80+
async with aiohttp.ClientSession(
81+
connector=ProxyConnector(remote_resolve=False),
82+
request_class=ProxyClientRequest
83+
) as aiohttp_proxy_check_session:
84+
while not self.queue.empty() and i <= settings.CONCURRENT_TASKS_COUNT:
85+
proxy_data = self.queue.get_nowait()
86+
tasks.append(self.process_proxy(*proxy_data, aiohttp_proxy_check_session))
87+
self.queue.task_done()
7988

80-
if tasks:
81-
await asyncio.wait(tasks)
82-
tasks.clear()
89+
if tasks:
90+
await asyncio.wait(tasks)
91+
tasks.clear()
8392

8493
async def producer(self):
8594
while True:
@@ -190,7 +199,8 @@ async def process_raw_proxies(self, proxies, collector_id):
190199
collector_id,
191200
))
192201

193-
async def process_proxy(self, raw_protocol: int, auth_data: str, domain: str, port: int, collector_id: int):
202+
async def process_proxy(self, raw_protocol: int, auth_data: str, domain: str, port: int, collector_id: int,
203+
aiohttp_proxy_check_session):
194204
self.logger.debug("start processing proxy {}://{}@{}:{} with collector id {}".format(
195205
raw_protocol, auth_data, domain, port, collector_id))
196206

@@ -205,7 +215,7 @@ async def process_proxy(self, raw_protocol: int, auth_data: str, domain: str, po
205215

206216
try:
207217
start_checking_time = time.time()
208-
check_result = await proxy_utils.check_proxy(proxy_url)
218+
check_result = await proxy_utils.check_proxy(proxy_url, aiohttp_proxy_check_session)
209219
end_checking_time = time.time()
210220

211221
if check_result:

proxy_py/_settings.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212

1313
# fetcher settings
1414

15-
CONCURRENT_TASKS_COUNT = 128
16-
PROXY_QUEUE_SIZE = 512
15+
CONCURRENT_TASKS_COUNT = 4096
16+
PROXY_QUEUE_SIZE = 4096
1717

1818
MIN_PROXY_CHECKING_PERIOD = 5 * 60
1919
MAX_PROXY_CHECKING_PERIOD = 45 * 60

proxy_utils.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,18 @@
1111

1212

1313
# TODO: add multiple checks with several sites
14-
async def check_proxy(proxy_url: str):
14+
async def check_proxy(proxy_url: str, session):
1515
try:
16-
res = await async_requests.get(
16+
res = await _request(
17+
'get',
1718
'https://pikagraphs.d3d.info/OK/',
18-
proxy=proxy_url,
19-
timeout=settings.PROXY_CHECKING_TIMEOUT,
19+
proxy_url,
20+
settings.PROXY_CHECKING_TIMEOUT,
21+
session
2022
)
23+
res2 = await _request('GET', 'https://wtfismyip.com/text', proxy_url, settings.PROXY_CHECKING_TIMEOUT, session)
24+
res = async_requests.Response(200, "OK")
25+
print(res2)
2126
if res.status == 200 and res.text == "OK":
2227
return True
2328
except (aiohttp.client_exceptions.ServerDisconnectedError,
@@ -29,18 +34,19 @@ async def check_proxy(proxy_url: str):
2934
asyncio.TimeoutError,
3035
aiohttp.client_exceptions.ClientOSError,
3136
) as ex:
37+
message = str(ex)
38+
if "file" in message:
39+
print(message)
3240
# TODO: check to "Too many open files"
3341
return False
34-
#
35-
# async def get_working_proxies(domain, port, auth_data=""):
36-
# proxies = []
37-
#
38-
# for protocol in Proxy.PROTOCOLS:
39-
# proxy = Proxy(protocol=protocol, domain=domain, port=port, auth_data=auth_data)
40-
#
41-
# start_checking_time = time.time()
42-
# if await check_proxy(proxy):
43-
# end_checking_time = time.time()
44-
# proxies.append((proxy, start_checking_time, end_checking_time))
45-
#
46-
# return proxies
42+
43+
44+
async def _request(method, url, proxy_url, timeout, session : aiohttp.ClientSession):
45+
headers = {
46+
'User-Agent': async_requests.get_random_user_agent()
47+
}
48+
49+
async with session.request(method, url, timeout=timeout, headers=headers, proxy=proxy_url) as response:
50+
status = response.status
51+
text = await response.text()
52+
return async_requests.Response(status, text)

0 commit comments

Comments
 (0)