Skip to content

Commit 2f30c30

Browse files
committed
dump&restore script
1 parent 4fb6cd7 commit 2f30c30

File tree

5 files changed

+154
-22
lines changed

5 files changed

+154
-22
lines changed

collectors_list.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,20 @@
1-
import asyncio
2-
3-
import peewee
4-
51
from models import db, CollectorState
62
from proxy_py import settings
73

84
import os
5+
import asyncio
96
import importlib.util
107

118

129
collectors = {}
1310

1411

15-
print("collectors dir: {}".format(settings.COLLECTORS_DIR))
1612
for root, dirs, files in os.walk(settings.COLLECTORS_DIR):
1713
for file in files:
1814
if file.endswith(".py"):
19-
print("processing file: {}".format(file))
20-
2115
file_path = os.path.join(root, file)
2216
module_name = os.path.splitext(file_path)[0].replace('/', '.')
23-
print("module name: {}".format(module_name))
2417
spec = importlib.util.spec_from_file_location(module_name, file_path)
25-
2618
collector_module = importlib.util.module_from_spec(spec)
2719
spec.loader.exec_module(collector_module)
2820

@@ -61,7 +53,8 @@ def get_collector_state(module_name: str):
6153
def get_collector_of_module_name(module_name: str):
6254
if module_name not in collectors:
6355
raise CollectorNotFoundException(
64-
"Probably some collector exists in database but not in filesystem. module_name = {}".format(module_name)
56+
"Probably some collector exists in database but not in filesystem. "
57+
"module_name = {}".format(module_name)
6558
)
6659

6760
return collectors[module_name]

dump_db.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from proxy_py import settings
2+
3+
import json
4+
import os
5+
import asyncio
6+
import aiopg
7+
import sys
8+
9+
10+
dsn = 'dbname={} user={} password={} host=127.0.0.1'.format(
11+
settings.DATABASE_CONNECTION_KWARGS['database'],
12+
settings.DATABASE_CONNECTION_KWARGS['user'],
13+
settings.DATABASE_CONNECTION_KWARGS['password'],
14+
)
15+
16+
17+
async def dump_it(table_name, save_dir):
18+
print("start dumping table {}".format(table_name))
19+
20+
pool = await aiopg.create_pool(dsn)
21+
async with pool.acquire() as conn:
22+
async with conn.cursor() as cur:
23+
await cur.execute("SELECT * from {}".format(table_name))
24+
# ret = []
25+
column_names = [desc[0] for desc in cur.description]
26+
print("found columns: {}".format(column_names))
27+
28+
file_path = os.path.join(save_dir, table_name + ".json")
29+
print("writing to file {}".format(file_path))
30+
with open(file_path, 'w') as file:
31+
file.write("[\n")
32+
async for row in cur:
33+
json_obj = {
34+
column_name: row[i]
35+
for i, column_name in enumerate(column_names)
36+
}
37+
38+
file.write("{},\n".format(json.dumps(json_obj)))
39+
40+
file.write("]\n")
41+
42+
print()
43+
44+
45+
async def main():
46+
if len(sys.argv) < 2:
47+
raise BaseException("Please, specify directory where to save your dump")
48+
49+
save_to_dir = sys.argv[1]
50+
if not os.path.isdir(save_to_dir):
51+
raise BaseException("{} is not a directory".format(save_to_dir))
52+
53+
await dump_it('proxies', save_to_dir)
54+
await dump_it('proxy_count_items', save_to_dir)
55+
await dump_it('collector_states', save_to_dir)
56+
57+
58+
if __name__ == '__main__':
59+
loop = asyncio.get_event_loop()
60+
loop.run_until_complete(main())

fill_db.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from proxy_py import settings
2+
3+
import json
4+
import os
5+
import asyncio
6+
import aiopg
7+
import sys
8+
9+
10+
dsn = 'dbname={} user={} password={} host=127.0.0.1'.format(
11+
settings.DATABASE_CONNECTION_KWARGS['database'],
12+
settings.DATABASE_CONNECTION_KWARGS['user'],
13+
settings.DATABASE_CONNECTION_KWARGS['password'],
14+
)
15+
16+
17+
async def fill_it(dir_path, table_name):
18+
print("start filling table {}".format(table_name))
19+
20+
pool = await aiopg.create_pool(dsn)
21+
async with pool.acquire() as conn:
22+
async with conn.cursor() as cur:
23+
# await cur.execute()
24+
25+
file_path = os.path.join(dir_path, table_name + ".json")
26+
27+
print("reading file {}".format(file_path))
28+
with open(file_path) as file:
29+
print("filling database", end="")
30+
for i, line in enumerate(file):
31+
print(".", end="")
32+
sys.stdout.flush()
33+
34+
line = line.strip()
35+
if line and line not in '[]':
36+
line = line[:-1]
37+
json_obj = json.loads(line)
38+
# print(json_obj)
39+
keys = []
40+
values = []
41+
for key, val in json_obj.items():
42+
keys.append(key)
43+
values.append(val)
44+
45+
sql_request = """
46+
INSERT INTO {} ({})
47+
VALUES ({})
48+
ON CONFLICT DO NOTHING;
49+
""".format(table_name, ', '.join(keys), ', '.join(['%s' for _ in range(len(values))]))
50+
51+
# print(sql_request.strip())
52+
53+
await cur.execute(sql_request, values)
54+
55+
print()
56+
57+
print()
58+
59+
60+
async def main():
61+
if len(sys.argv) < 2:
62+
raise BaseException("Please, specify directory from which to load your dumps")
63+
64+
load_from_dir = sys.argv[1]
65+
if not os.path.isdir(load_from_dir):
66+
raise BaseException("{} is not a directory".format(load_from_dir))
67+
68+
for filename in [filename for filename in os.listdir(load_from_dir)
69+
if os.path.isfile(os.path.join(load_from_dir, filename))]:
70+
table_name, _ = filename.split(".")
71+
await fill_it(load_from_dir, table_name)
72+
73+
74+
if __name__ == '__main__':
75+
loop = asyncio.get_event_loop()
76+
loop.run_until_complete(main())

main.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from proxy_py import settings
44
from processor import Processor
55
from server.proxy_provider_server import ProxyProviderServer
6-
76
from models import Proxy, ProxyCountItem, db
87

98
import asyncio

processor.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import collectors_list
66
import proxy_utils
7-
87
import asyncio
98
import time
109
import re
@@ -26,7 +25,6 @@ class Processor:
2625
"""
2726

2827
def __init__(self):
29-
# TODO: find better logger
3028
self.logger = logging.getLogger("proxy_py/processor")
3129

3230
if settings.DEBUG:
@@ -66,17 +64,23 @@ async def consumer(self):
6664
tasks = []
6765
while True:
6866
await asyncio.sleep(0.1)
69-
i = 0
67+
try:
68+
i = 0
7069

71-
while not self.queue.empty() and i <= settings.CONCURRENT_TASKS_COUNT:
72-
proxy_data = self.queue.get_nowait()
73-
tasks.append(self.process_proxy(*proxy_data))
74-
self.queue.task_done()
75-
i += 1
70+
while not self.queue.empty() and i <= settings.CONCURRENT_TASKS_COUNT:
71+
proxy_data = self.queue.get_nowait()
72+
tasks.append(self.process_proxy(*proxy_data))
73+
self.queue.task_done()
74+
i += 1
7675

77-
if tasks:
78-
await asyncio.wait(tasks)
79-
tasks.clear()
76+
if tasks:
77+
await asyncio.wait(tasks)
78+
tasks.clear()
79+
except KeyboardInterrupt:
80+
raise
81+
except BaseException as ex:
82+
self.logger.exception(ex)
83+
await asyncio.sleep(10)
8084

8185
async def producer(self):
8286
while True:

0 commit comments

Comments
 (0)