Skip to content

Commit 4cdae55

Browse files
committed
start rewriting on new ORM
1 parent 748f812 commit 4cdae55

File tree

5 files changed

+137
-104
lines changed

5 files changed

+137
-104
lines changed

collectors_list.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from models import session, CollectorState
1+
from models import db, CollectorState
22
from proxy_py import settings
33

44
import os
@@ -28,16 +28,17 @@
2828

2929

3030
# init db
31-
for module_name, CollectorType in collectors.items():
32-
collectorState = session.query(CollectorState).filter(CollectorState.identifier == module_name).first()
33-
34-
if not collectorState:
35-
session.add(CollectorState(
36-
identifier=module_name,
37-
processing_period=CollectorType.processing_period,
38-
last_processing_time=0,
39-
))
40-
session.commit()
31+
# TODO: do it
32+
# for module_name, CollectorType in collectors.items():
33+
# collectorState = session.query(CollectorState).filter(CollectorState.identifier == module_name).first()
34+
#
35+
# if not collectorState:
36+
# session.add(CollectorState(
37+
# identifier=module_name,
38+
# processing_period=CollectorType.processing_period,
39+
# last_processing_time=0,
40+
# ))
41+
# session.commit()
4142

4243

4344
# def get_collector_state(module_name : str):
@@ -50,7 +51,9 @@
5051

5152
def get_collector_of_module_name(module_name: str):
5253
if module_name not in collectors:
53-
raise CollectorNotFoundException()
54+
raise CollectorNotFoundException(
55+
'Probably some collector exists in database but not in filesystem. module_name = {}'.format(module_name)
56+
)
5457

5558
return collectors[module_name]
5659

main.py

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,62 @@
11
#!/usr/bin/env python3
22

33
from proxy_py import settings
4-
from processor import Processor
5-
from server.proxy_provider_server import ProxyProviderServer
6-
import collectors_list
7-
from models import Proxy, ProxyCountItem, session
4+
# # from processor import Processor
5+
# from server.proxy_provider_server import ProxyProviderServer
6+
# import collectors_list
7+
import peewee
8+
9+
from models import Proxy, ProxyCountItem, db
810

911
import asyncio
1012
import time
1113

1214

1315
async def create_proxy_count_item():
14-
session.add(
15-
ProxyCountItem(
16-
timestamp=int(time.time()),
17-
good_proxies_count=session.query(Proxy).filter(Proxy.number_of_bad_checks == 0).count(),
18-
bad_proxies_count=session.query(Proxy).filter(Proxy.number_of_bad_checks > 0)
19-
.filter(Proxy.number_of_bad_checks < settings.DEAD_PROXY_THRESHOLD).count(),
20-
dead_proxies_count=session.query(Proxy)
21-
.filter(Proxy.number_of_bad_checks >= settings.DEAD_PROXY_THRESHOLD).count()
22-
)
16+
good_proxies_count = await db.count(Proxy.select().where(Proxy.number_of_bad_checks == 0))
17+
bad_proxies_count = await db.count(Proxy.select().where(
18+
Proxy.number_of_bad_checks > 0,
19+
Proxy.number_of_bad_checks < settings.DEAD_PROXY_THRESHOLD,
20+
))
21+
dead_proxies_count = await db.count(Proxy.select().where(
22+
Proxy.number_of_bad_checks >= settings.DEAD_PROXY_THRESHOLD
23+
))
24+
25+
await db.create(
26+
ProxyCountItem,
27+
timestamp=int(time.time()),
28+
good_proxies_count=good_proxies_count,
29+
bad_proxies_count=bad_proxies_count,
30+
dead_proxies_count=dead_proxies_count,
2331
)
24-
session.commit()
2532

2633

2734
async def proxy_counter():
2835
while True:
29-
if session.query(ProxyCountItem).count() == 0:
36+
if (await db.count(ProxyCountItem.select())) == 0:
3037
await create_proxy_count_item()
3138
else:
32-
last_item = session.query(ProxyCountItem).order_by(ProxyCountItem.timestamp.desc()).first()
39+
last_item = await db.get(ProxyCountItem.select().order_by(ProxyCountItem.timestamp.desc()))
40+
3341
if int(last_item.timestamp // 60) * 60 + 60 < time.time():
3442
await create_proxy_count_item()
3543

36-
await asyncio.sleep(1)
44+
await asyncio.sleep(5)
3745

3846

3947
if __name__ == "__main__":
40-
proxy_processor = Processor()
41-
# for module_name, CollectorType in collectors_list.collector_types.items():
42-
# proxy_processor.add_collector_of_type(CollectorType)
43-
44-
proxy_provider_server = ProxyProviderServer.get_proxy_provider_server(
45-
settings.PROXY_PROVIDER_SERVER_ADDRESS['HOST'],
46-
settings.PROXY_PROVIDER_SERVER_ADDRESS['PORT'],
47-
proxy_processor,
48-
)
48+
# proxy_processor = Processor()
49+
50+
# proxy_provider_server = ProxyProviderServer.get_proxy_provider_server(
51+
# settings.PROXY_PROVIDER_SERVER_ADDRESS['HOST'],
52+
# settings.PROXY_PROVIDER_SERVER_ADDRESS['PORT'],
53+
# None, # proxy_processor,
54+
# )
4955

5056
loop = asyncio.get_event_loop()
51-
loop.run_until_complete(proxy_provider_server.start(loop))
57+
# loop.run_until_complete(proxy_provider_server.start(loop))
5258
loop.run_until_complete(asyncio.wait([
53-
proxy_processor.exec(),
59+
# TODO:
60+
# proxy_processor.exec(),
5461
proxy_counter(),
55-
]))
62+
]))

models.py

Lines changed: 68 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,46 @@
11
from proxy_py import settings
2-
from sqlalchemy import create_engine
3-
from sqlalchemy.ext.declarative import declarative_base
4-
from sqlalchemy import Column, Integer, String, SmallInteger, UniqueConstraint
5-
from sqlalchemy.orm import sessionmaker
2+
import peewee
3+
import peewee_async
64

5+
db = peewee_async.PooledPostgresqlDatabase(
6+
*settings.DATABASE_CONNECTION_ARGS,
7+
**settings.DATABASE_CONNECTION_KWARGS
8+
)
79

8-
engine = create_engine(*settings.DATABASE_CONNECTION_ARGS, **settings.DATABASE_CONNECTION_KWARGS)
9-
Base = declarative_base()
10-
Session = sessionmaker(bind=engine)
1110

12-
13-
class Proxy(Base):
14-
__tablename__ = "proxies"
15-
__table_args__ = (
16-
UniqueConstraint("raw_protocol", "auth_data", "domain", "port"),
17-
)
11+
class Proxy(peewee.Model):
12+
class Meta:
13+
database = db
14+
db_table = 'proxies'
15+
indexes = (
16+
(('raw_protocol', 'auth_data', 'domain', 'port'), True),
17+
)
1818

1919
PROTOCOLS = (
2020
'http',
2121
'socks4',
2222
'socks5',
2323
)
2424

25-
id = Column(Integer, primary_key=True)
26-
raw_protocol = Column(SmallInteger, nullable=False)
27-
domain = Column(String(128), nullable=False)
28-
port = Column(Integer, nullable=False)
29-
auth_data = Column(String(64), default="", nullable=False)
30-
31-
checking_period = Column(Integer, default=settings.MIN_PROXY_CHECKING_PERIOD, nullable=False)
32-
last_check_time = Column(Integer, default=0)
33-
number_of_bad_checks = Column(Integer, default=0)
34-
uptime = Column(Integer, nullable=True, default=None)
35-
bad_uptime = Column(Integer, nullable=True, default=None)
25+
id = peewee.IntegerField(primary_key=True)
26+
raw_protocol = peewee.SmallIntegerField(null=False)
27+
domain = peewee.CharField(settings.DB_MAX_DOMAIN_LENGTH, null=False)
28+
port = peewee.IntegerField(null=False)
29+
auth_data = peewee.CharField(settings.DB_AUTH_DATA_MAX_LENGTH, default='', null=False)
30+
31+
checking_period = peewee.IntegerField(default=settings.MIN_PROXY_CHECKING_PERIOD, null=False)
32+
last_check_time = peewee.IntegerField(default=0, null=False)
33+
number_of_bad_checks = peewee.IntegerField(default=0, null=False)
34+
uptime = peewee.IntegerField(default=None, null=True)
35+
bad_uptime = peewee.IntegerField(default=None, null=True)
3636
# in microseconds
37-
response_time = Column(Integer, nullable=True, default=None)
38-
_white_ipv4 = Column(String(16), nullable=True)
39-
_white_ipv6 = Column(String(16), nullable=True)
40-
city = Column(String(), nullable=True)
41-
region = Column(String(), nullable=True)
42-
country_code = Column(String(3), nullable=True)
37+
response_time = peewee.IntegerField(default=None, null=True)
38+
# TODO: consider storing as binary
39+
_white_ipv4 = peewee.CharField(16, null=True)
40+
_white_ipv6 = peewee.CharField(45, null=True)
41+
city = peewee.TextField(null=True)
42+
region = peewee.TextField(null=True)
43+
country_code = peewee.CharField(3, null=True)
4344

4445
def get_raw_protocol(self):
4546
return self.raw_protocol
@@ -92,36 +93,49 @@ def __str__(self):
9293
__repr__ = __str__
9394

9495

95-
class ProxyCountItem(Base):
96-
__tablename__ = "proxy_count_items"
97-
timestamp = Column(Integer, primary_key=True)
98-
good_proxies_count = Column(Integer, nullable=False)
99-
bad_proxies_count = Column(Integer, nullable=False)
100-
dead_proxies_count = Column(Integer, nullable=False)
96+
class ProxyCountItem(peewee.Model):
97+
class Meta:
98+
database = db
99+
db_table = 'proxy_count_items'
101100

101+
timestamp = peewee.IntegerField(primary_key=True)
102+
good_proxies_count = peewee.IntegerField(null=False)
103+
bad_proxies_count = peewee.IntegerField(null=False)
104+
dead_proxies_count = peewee.IntegerField(null=False)
102105

103-
class CollectorState(Base):
104-
__tablename__ = "collector_states"
105-
id = Column(Integer, primary_key=True)
106-
# python module name
107-
identifier = Column(String, unique=True)
108-
processing_period = Column(Integer, nullable=False)
109-
last_processing_time = Column(Integer, nullable=False)
110-
last_processing_proxies_count = Column(Integer, nullable=False, default=0)
111-
last_processing_new_proxies_count = Column(Integer, nullable=False, default=0)
112-
data = Column(String, nullable=True, default=None)
113106

107+
class CollectorState(peewee.Model):
108+
class Meta:
109+
database = db
110+
db_table = 'collector_states'
114111

115-
Base.metadata.create_all(engine)
112+
id = peewee.IntegerField(primary_key=True)
113+
# python module name
114+
identifier = peewee.TextField(unique=True)
115+
processing_period = peewee.IntegerField(null=False)
116+
last_processing_time = peewee.IntegerField(null=False)
117+
last_processing_proxies_count = peewee.IntegerField(default=0, null=False)
118+
last_processing_new_proxies_count = peewee.IntegerField(default=0, null=False)
119+
data = peewee.TextField(default=None, null=True)
116120

117-
session = Session()
121+
122+
_silent = True
123+
Proxy.create_table(_silent)
124+
ProxyCountItem.create_table(_silent)
125+
CollectorState.create_table(_silent)
118126

119127

120128
def get_or_create(session, model, **kwargs):
121-
instance = session.query(model).filter_by(**kwargs).first()
122-
if not instance:
123-
instance = model(**kwargs)
124-
session.add(instance)
125-
session.commit()
129+
# TODO: do it
130+
raise NotImplementedError()
131+
132+
# instance = session.query(model).filter_by(**kwargs).first()
133+
# if not instance:
134+
# instance = model(**kwargs)
135+
# session.add(instance)
136+
# session.commit()
137+
#
138+
# return instance
139+
126140

127-
return instance
141+
db = peewee_async.Manager(db)

proxy_py/_settings.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,31 @@
33

44
import os
55

6-
DATABASE_CONNECTION_ARGS = (
7-
'sqlite:///db.sqlite3',
8-
)
9-
10-
DATABASE_CONNECTION_KWARGS = {}
6+
DATABASE_CONNECTION_ARGS = ()
7+
DATABASE_CONNECTION_KWARGS = {
8+
'database': 'test',
9+
'user': 'test',
10+
'password': 'test',
11+
}
1112

1213
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
1314

1415
DEBUG = False
1516

17+
# it allows you to override collectors
18+
# for example if you're making proxy checker for particular site
19+
# you can override COLLECTORS_DIR and PROXY_CHECKERS
1620
COLLECTORS_DIR = 'collectors'
1721

22+
# db (do not try to change after creation of database)
23+
24+
DB_MAX_DOMAIN_LENGTH = 128
25+
DB_AUTH_DATA_MAX_LENGTH = 64
26+
1827
# fetcher settings
1928

20-
CONCURRENT_TASKS_COUNT = 128
21-
PROXY_QUEUE_SIZE = 512
29+
CONCURRENT_TASKS_COUNT = 64
30+
PROXY_QUEUE_SIZE = 128
2231

2332
MIN_PROXY_CHECKING_PERIOD = 5 * 60
2433
MAX_PROXY_CHECKING_PERIOD = 45 * 60

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
yarl==0.18.0
22
aiohttp==2.3.2
33
aiosocks==0.2.5
4-
alembic==0.9.6
54
lxml==3.8.0
65
PySocks==1.6.7
7-
SQLAlchemy==1.1.15
86
fake-useragent
97
aiohttp_jinja2
108
jinja2
9+
peewee-async
10+
aiopg

0 commit comments

Comments
 (0)