Skip to content

Commit 2dd56b5

Browse files
committed
Merge branch 'master' of github.com:DevAlone/proxy_py
2 parents d8c5f78 + aace39a commit 2dd56b5

File tree

7 files changed

+230
-26
lines changed

7 files changed

+230
-26
lines changed

checkers/base_checker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def set_attr_if_is_not_none(attribute_name, first_obj, second_obj):
3535

3636

3737
class BaseChecker:
38+
# TODO: rewrite using HttpClient
3839
aiohttp_connector = None
3940

4041
def __init__(self, url=None, request_type="GET", timeout=None):

collectors/web/cn/89ip/collector.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from collectors.abstract_collector import AbstractCollector
2+
import http_client
3+
4+
5+
class Collector(AbstractCollector):
6+
__collector__ = True
7+
8+
def __init__(self):
9+
super(Collector, self).__init__()
10+
self.processing_period = 30 * 60 # 30 minutes
11+
'''
12+
floating period means proxy_py will be changing
13+
period to not make extra requests and handle
14+
new proxies in time, you don't need to change
15+
it in most cases
16+
'''
17+
# self.floating_processing_period = False
18+
19+
async def collect(self):
20+
url = 'http://www.89ip.cn/tqdl.html?num=9999&address=&kill_address=&port=&kill_port=&isp='
21+
html = await http_client.get_text(url)
22+
23+
return []

docs/source/guides/how_to_create_collector.rst

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
proxy_py How to create collector
22
================================
33

4-
Collector is a class which is used to parse proxies from web page or another source. All collectors are inherited from `collectors.abstract_collector.AbstractCollector`, also there is `collectors.pages_collector.PagesCollector` which is used for paginated sources. It's better to understand through examples.
4+
Collector is a class which is used to parse proxies from web page or another source.
5+
All collectors are inherited from `collectors.abstract_collector.AbstractCollector`,
6+
also there is `collectors.pages_collector.PagesCollector` which is used for paginated sources.
7+
It's better to understand through examples.
58

69
Simple collector
710
****************
811

9-
Let's start with the simplest collector we can imagine, it will be from the page http://www.89ip.cn/ti.html as you can see, it sends form as GET request to this url http://www.89ip.cn/tqdl.html?num=9999&address=&kill_address=&port=&kill_port=&isp=
12+
Let's start with the simplest collector we can imagine,
13+
it will be from the page http://www.89ip.cn/ti.html as you can see,
14+
it sends form as GET request to this url
15+
http://www.89ip.cn/tqdl.html?num=9999&address=&kill_address=&port=&kill_port=&isp=
1016

11-
Firstly we can try to check that these proxies are really good. Just copy and paste list of proxies to file say /tmp/proxies and run this command inside virtual environment
17+
Firstly we can try to check that these proxies are really good.
18+
Just copy and paste list of proxies to file say /tmp/proxies and run this command inside virtual environment
1219

1320
.. code-block:: bash
1421
@@ -18,10 +25,45 @@ You're gonna get something like this:
1825

1926
`++++++++++++++++++++++-+++++-+++++++++++++++++++++++++++-++++++-++++-+++++++++++++++++++++++++++++++--+++++++-+++++++-++-+-+++-+++++++++-+++++++++++++++++++++--++--+-++++++++++++++++-+++--+++-+-+++++++++++++++++--++++++++++++-+++++-+++-++++++++-+++++-+-+++++++-++-+--++++-+++-++++++++++-++++--+++++++-+++++++-++--+++++-+-+++++++++++++++++++++-++-+++-+++--++++--+++-+++++++-+++++++-+++++++++++++++---+++++-+++++++++-+++++-+-++++++++++++-+--+++--+-+-+-++-+++++-+++--++++++-+++++++++++--+-+++-+-++++--+++++--+++++++++-+-+-++++-+-++++++++++++++-++-++++++--+--++++-+-++--++--+++++-++-+++-++++--++--+---------+--+--++--------+++-++-+--++++++++++++++++-+++++++++-+++++++--+--+--+-+-+++---++------------------+--+----------+-+-+--++-+----------+-------+--+------+----+-+--+--++----+--+-++++++-++-+++`
2027

21-
+ means working proxy with at leat one protocol, - means not working, the result above is perfect, so many good proxies.
28+
+ means working proxy with at least one protocol,
29+
- means not working, the result above is perfect, so many good proxies.
2230

2331
Note: working means proxy respond with timeout set in settings, if you increase it you'll get more proxies.
2432

33+
Alright, let's code it!
34+
35+
We need to place our collector inside `collectors/web/` directory using reversed domain path,
36+
it will be `collectors/web/cn/89ip/collector.py`
37+
38+
To make class be collector we need to declare variable `__collector__`
39+
40+
Note: name of file and name of class don't make sense,
41+
you can declare as many files and classes in each file per domain as you want
42+
43+
.. code-block:: python
44+
45+
from collectors.abstract_collector import AbstractCollector
46+
47+
48+
class Collector(AbstractCollector):
49+
__collector__ = True
50+
51+
We can override default processing period in constructor like this:
52+
53+
.. code-block:: python
54+
55+
def __init__(self):
56+
super(Collector, self).__init__()
57+
self.processing_period = 30 * 60 # 30 minutes
58+
'''
59+
floating period means proxy_py will be changing
60+
period to not make extra requests and handle
61+
new proxies in time, you don't need to change
62+
it in most cases
63+
'''
64+
# self.floating_processing_period = False
65+
66+
2567
Paginated collector
2668
*******************
2769

http_client.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import json
2+
3+
from proxy_py import settings
4+
from fake_useragent import UserAgent
5+
from aiosocks.connector import ProxyConnector, ProxyClientRequest
6+
7+
import aiohttp
8+
9+
10+
class HttpClientResult:
11+
text = None
12+
aiohttp_response = None
13+
14+
@staticmethod
15+
async def make(aiohttp_response):
16+
obj = HttpClientResult()
17+
obj.aiohttp_response = aiohttp_response
18+
obj.text = await obj.aiohttp_response.text()
19+
20+
return obj
21+
22+
def as_text(self):
23+
return self.text
24+
25+
def as_json(self):
26+
return json.loads(self.text)
27+
28+
29+
# TODO: complete cookies saving
30+
class HttpClient:
31+
"""
32+
Simple class for making http requests,
33+
user-agent is set to random one in constructor
34+
"""
35+
_aiohttp_connector = None
36+
37+
def __init__(self):
38+
self.user_agent = UserAgent().random
39+
self.timeout = 60
40+
if HttpClient._aiohttp_connector is None:
41+
HttpClient._aiohttp_connector = ProxyConnector(
42+
remote_resolve=True,
43+
limit=settings.NUMBER_OF_SIMULTANEOUS_REQUESTS,
44+
limit_per_host=settings.NUMBER_OF_SIMULTANEOUS_REQUESTS_PER_HOST,
45+
)
46+
self.proxy_address = None
47+
48+
async def get(self, url):
49+
"""
50+
send HTTP GET request
51+
52+
:param url:
53+
:return:
54+
"""
55+
return await self.request('GET', url, None)
56+
57+
async def post(self, url, data):
58+
"""
59+
send HTTP POST request
60+
61+
:param url:
62+
:param data:
63+
:return:
64+
"""
65+
return await self.request('POST', url, data)
66+
67+
async def request(self, method, url, data) -> HttpClientResult:
68+
headers = {
69+
'User-Agent': self.user_agent,
70+
}
71+
72+
async with aiohttp.ClientSession(connector=HttpClient._aiohttp_connector,
73+
connector_owner=False,
74+
request_class=ProxyClientRequest
75+
) as session:
76+
async with session.request(method,
77+
url=url,
78+
data=data,
79+
proxy=self.proxy_address,
80+
timeout=self.timeout,
81+
headers=headers) as response:
82+
return await HttpClientResult.make(response)
83+
84+
@staticmethod
85+
async def clean():
86+
HttpClient._aiohttp_connector.close()
87+
88+
89+
async def get_text(url):
90+
"""
91+
fast method for sending get response without creating extra objects
92+
93+
:param url:
94+
:return:
95+
"""
96+
return (await HttpClient().get(url)).as_text()
97+
98+
99+
async def get_json(url):
100+
return (await HttpClient().get(url)).as_json()

processor.py

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def __init__(self):
6868

6969
self.queue = asyncio.Queue(maxsize=settings.PROXY_QUEUE_SIZE)
7070
self.proxies_semaphore = asyncio.BoundedSemaphore(settings.NUMBER_OF_CONCURRENT_TASKS)
71+
self.good_proxies_are_processed = False
7172

7273
async def worker(self):
7374
await asyncio.gather(*[
@@ -93,6 +94,13 @@ async def consumer(self):
9394
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
9495

9596
async def producer(self):
97+
while True:
98+
await asyncio.gather(*[
99+
self.process_proxies(),
100+
self.process_collectors(),
101+
])
102+
103+
async def process_proxies(self):
96104
while True:
97105
await asyncio.sleep(0.00001)
98106
try:
@@ -103,28 +111,15 @@ async def producer(self):
103111
Proxy.last_check_time < time.time() - Proxy.checking_period,
104112
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
105113
)
114+
if proxies:
115+
self.good_proxies_are_processed = False
106116

107117
await self.add_proxies_to_queue(proxies)
108118

109-
if len(proxies) > settings.NUMBER_OF_CONCURRENT_TASKS / 2:
119+
if proxies:
110120
continue
111121

112-
# check collectors
113-
collector_states = await db.execute(
114-
CollectorState.select().where(
115-
CollectorState.last_processing_time < time.time() - CollectorState.processing_period
116-
).order_by(peewee.fn.Random()).
117-
limit(settings.NUMBER_OF_CONCURRENT_COLLECTORS)
118-
)
119-
120-
tasks = [
121-
self.process_collector_of_state(collector_state)
122-
for collector_state in collector_states
123-
]
124-
125-
if tasks:
126-
await asyncio.gather(*tasks)
127-
continue
122+
self.good_proxies_are_processed = True
128123

129124
# check bad proxies
130125
proxies = await db.execute(
@@ -159,6 +154,31 @@ async def producer(self):
159154

160155
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
161156

157+
async def process_collectors(self):
158+
while True:
159+
try:
160+
await asyncio.sleep(0.000001)
161+
162+
# check collectors
163+
collector_states = await db.execute(
164+
CollectorState.select().where(
165+
CollectorState.last_processing_time < time.time() - CollectorState.processing_period
166+
).order_by(peewee.fn.Random()).limit(settings.NUMBER_OF_CONCURRENT_COLLECTORS)
167+
)
168+
169+
await asyncio.gather(*[
170+
self.process_collector_of_state(collector_state)
171+
for collector_state in collector_states
172+
])
173+
except KeyboardInterrupt as ex:
174+
raise ex
175+
except BaseException as ex:
176+
self.collectors_logger.exception(ex)
177+
if settings.DEBUG:
178+
raise ex
179+
180+
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
181+
162182
def is_queue_free(self):
163183
return self.queue.qsize() < settings.NUMBER_OF_CONCURRENT_TASKS
164184

@@ -183,15 +203,15 @@ async def process_collector_of_state(self, collector_state):
183203
)
184204
proxies = await collector._collect()
185205

186-
if not proxies:
187-
self.collectors_logger.warning(
188-
"got 0 proxies from collector of type \"{}\"".format(type(collector))
189-
)
190-
else:
206+
if proxies:
191207
self.logger.debug(
192208
"got {} proxies from collector of type \"{}\"".format(len(proxies), type(collector))
193209
)
194210
await self.process_raw_proxies(proxies, collector_state.id)
211+
else:
212+
self.collectors_logger.warning(
213+
"got 0 proxies from collector of type \"{}\"".format(type(collector))
214+
)
195215
except KeyboardInterrupt as ex:
196216
raise ex
197217
except BaseException as ex:
@@ -208,6 +228,7 @@ async def process_raw_proxies(self, proxies, collector_id):
208228
tasks = []
209229

210230
for proxy in proxies:
231+
# TODO: refactor it
211232
tasks.append(self.process_raw_proxy(proxy, collector_id))
212233
if len(tasks) > settings.NUMBER_OF_CONCURRENT_TASKS:
213234
await asyncio.gather(*tasks)
@@ -261,6 +282,9 @@ async def process_raw_proxy(self, proxy, collector_id):
261282
pass
262283

263284
for raw_protocol in range(len(Proxy.PROTOCOLS)):
285+
while not self.good_proxies_are_processed:
286+
await asyncio.sleep(0.01)
287+
264288
await self.queue.put((
265289
raw_protocol,
266290
auth_data,

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ recommonmark
1010
sphinx_rtd_theme
1111
py_mini_racer
1212
pytest
13+
pytest-asyncio

tests/test_http_client.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import http_client
2+
import pytest
3+
4+
5+
@pytest.mark.asyncio
6+
async def test_fast_methods():
7+
resp = await http_client.get_json('https://ipinfo.io/json')
8+
assert 'ip' in resp
9+
10+
11+
def test_saving_state():
12+
# TODO: implement
13+
pass

0 commit comments

Comments
 (0)