Skip to content

Commit ed104a3

Browse files
committed
some refactoring
1 parent 829086c commit ed104a3

File tree

7 files changed

+105
-158
lines changed

7 files changed

+105
-158
lines changed

checkers/base_checker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ def __init__(self, url=None, request_type="GET", timeout=None):
4141
if BaseChecker.aiohttp_connector is None:
4242
BaseChecker.aiohttp_connector = ProxyConnector(
4343
remote_resolve=True,
44-
limit=settings.SIMULTANEOUS_REQUESTS_COUNT,
45-
limit_per_host=settings.SIMULTANEOUS_REQUESTS_PER_HOST_COUNT,
44+
limit=settings.NUMBER_OF_SIMULTANEOUS_REQUESTS,
45+
limit_per_host=settings.NUMBER_OF_SIMULTANEOUS_REQUESTS_PER_HOST,
4646
)
4747
self.request_type = request_type
4848
self.timeout = timeout if timeout is not None else settings.PROXY_CHECKING_TIMEOUT

collectors_list.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,35 +23,40 @@
2323
spec.loader.exec_module(collector_module)
2424

2525
if hasattr(collector_module, "Collector"):
26-
if hasattr(collector_module.Collector, "__collector__") and collector_module.Collector.__collector__:
26+
if hasattr(collector_module.Collector, "__collector__") \
27+
and collector_module.Collector.__collector__:
2728
collectors[module_name] = collector_module.Collector()
2829

2930

3031
# init db
3132

32-
for module_name, CollectorType in collectors.items():
33+
for module_name, Collector in collectors.items():
3334
try:
34-
collector_state = asyncio.get_event_loop().run_until_complete(db.get(
35-
CollectorState.select().where(CollectorState.identifier == module_name)
35+
asyncio.get_event_loop().run_until_complete(db.get(
36+
CollectorState.select().where(
37+
CollectorState.identifier == module_name
38+
)
3639
))
3740
except CollectorState.DoesNotExist:
3841
asyncio.get_event_loop().run_until_complete(db.create(
3942
CollectorState,
4043
identifier=module_name,
41-
processing_period=CollectorType.processing_period,
44+
processing_period=Collector.processing_period,
4245
last_processing_time=0,
4346
))
4447

4548

46-
def get_collector_state(module_name: str):
47-
try:
48-
collector_state = asyncio.get_event_loop().run_until_complete(db.get(
49-
CollectorState.select().where(CollectorState.identifier == module_name)
50-
))
51-
except CollectorState.DoesNotExist:
52-
raise CollectorNotFoundException()
53-
54-
return collector_state
49+
# def get_collector_state(module_name: str):
50+
# try:
51+
# collector_state = asyncio.get_event_loop().run_until_complete(db.get(
52+
# CollectorState.select().where(
53+
# CollectorState.identifier == module_name
54+
# )
55+
# ))
56+
# except CollectorState.DoesNotExist:
57+
# raise CollectorNotFoundException()
58+
#
59+
# return collector_state
5560

5661

5762
def get_collector_of_module_name(module_name: str):
@@ -70,5 +75,11 @@ async def load_collector(state: CollectorState):
7075
return collector
7176

7277

78+
async def save_collector(state: CollectorState):
79+
collector = get_collector_of_module_name(state.identifier)
80+
await collector.save_state(state)
81+
await db.update(state)
82+
83+
7384
class CollectorNotFoundException(BaseException):
7485
pass
File renamed without changes.
File renamed without changes.

processor.py

Lines changed: 17 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ class Processor:
2626
"""
2727
instance = None
2828

29+
@staticmethod
30+
def get_instance():
31+
if Processor.instance is None:
32+
Processor.instance = Processor()
33+
34+
return Processor.instance
35+
2936
def __init__(self):
3037
self.logger = logging.getLogger("proxy_py/processor")
3138

@@ -55,14 +62,7 @@ def __init__(self):
5562
self.logger.debug("processor initialization...")
5663

5764
self.queue = asyncio.Queue(maxsize=settings.PROXY_QUEUE_SIZE)
58-
self.proxies_semaphore = asyncio.BoundedSemaphore(settings.CONCURRENT_TASKS_COUNT)
59-
60-
@staticmethod
61-
def get_instance():
62-
if Processor.instance is None:
63-
Processor.instance = Processor()
64-
65-
return Processor.instance
65+
self.proxies_semaphore = asyncio.BoundedSemaphore(settings.NUMBER_OF_CONCURRENT_TASKS)
6666

6767
async def worker(self):
6868
await asyncio.gather(*[
@@ -89,19 +89,19 @@ async def consumer(self):
8989

9090
async def producer(self):
9191
while True:
92-
await asyncio.sleep(0.000001)
92+
await asyncio.sleep(0.00001)
9393
try:
9494
# check good proxies
9595
proxies = await db.execute(
9696
Proxy.select().where(
9797
Proxy.number_of_bad_checks == 0,
9898
Proxy.last_check_time < time.time() - Proxy.checking_period,
99-
).order_by(Proxy.last_check_time).limit(settings.CONCURRENT_TASKS_COUNT)
99+
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
100100
)
101101

102102
await self.add_proxies_to_queue(proxies)
103103

104-
if len(proxies) > settings.CONCURRENT_TASKS_COUNT / 2:
104+
if len(proxies) > settings.NUMBER_OF_CONCURRENT_TASKS / 2:
105105
continue
106106

107107
# check collectors
@@ -127,7 +127,7 @@ async def producer(self):
127127
Proxy.number_of_bad_checks > 0,
128128
Proxy.number_of_bad_checks < settings.DEAD_PROXY_THRESHOLD,
129129
Proxy.last_check_time < time.time() - settings.BAD_PROXY_CHECKING_PERIOD,
130-
).order_by(Proxy.last_check_time).limit(settings.CONCURRENT_TASKS_COUNT)
130+
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
131131
)
132132

133133
await self.add_proxies_to_queue(proxies)
@@ -141,7 +141,7 @@ async def producer(self):
141141
Proxy.number_of_bad_checks >= settings.DEAD_PROXY_THRESHOLD,
142142
Proxy.number_of_bad_checks < settings.DO_NOT_CHECK_ON_N_BAD_CHECKS,
143143
Proxy.last_check_time < time.time() - settings.DEAD_PROXY_CHECKING_PERIOD,
144-
).order_by(Proxy.last_check_time).limit(settings.CONCURRENT_TASKS_COUNT)
144+
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
145145
)
146146

147147
await self.add_proxies_to_queue(proxies)
@@ -154,87 +154,8 @@ async def producer(self):
154154

155155
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
156156

157-
# async def process_collectors(self):
158-
# while True:
159-
# await asyncio.sleep(0.001)
160-
# try:
161-
# if not self.is_queue_free():
162-
# continue
163-
#
164-
# # check collectors
165-
# collector_states = await db.execute(
166-
# CollectorState.select().where(
167-
# CollectorState.last_processing_time < time.time() - CollectorState.processing_period
168-
# ).limit(settings.CONCURRENT_TASKS_COUNT)
169-
# )
170-
#
171-
# tasks = [
172-
# self.process_collector_of_state(collector_state)
173-
# for collector_state in collector_states
174-
# ]
175-
#
176-
# if tasks:
177-
# await asyncio.gather(*tasks)
178-
# tasks.clear()
179-
#
180-
# # await self.queue.join()
181-
# except KeyboardInterrupt as ex:
182-
# raise ex
183-
# except BaseException as ex:
184-
# self.logger.exception(ex)
185-
# await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
186-
187157
def is_queue_free(self):
188-
return self.queue.qsize() < settings.CONCURRENT_TASKS_COUNT
189-
190-
# async def process_proxies(self):
191-
# while True:
192-
# await asyncio.sleep(0.001)
193-
# try:
194-
# # check good proxies
195-
# proxies = await db.execute(
196-
# Proxy.select().where(
197-
# Proxy.number_of_bad_checks == 0,
198-
# Proxy.last_check_time < time.time() - Proxy.checking_period,
199-
# ).order_by(Proxy.last_check_time).limit(settings.CONCURRENT_TASKS_COUNT)
200-
# )
201-
#
202-
# await self.add_proxies_to_queue(proxies)
203-
#
204-
# if len(proxies) > 0 or not self.is_queue_free():
205-
# continue
206-
#
207-
# # check bad proxies
208-
# proxies = await db.execute(
209-
# Proxy.select().where(
210-
# Proxy.number_of_bad_checks > 0,
211-
# Proxy.number_of_bad_checks < settings.DEAD_PROXY_THRESHOLD,
212-
# Proxy.last_check_time < time.time() - settings.BAD_PROXY_CHECKING_PERIOD,
213-
# ).order_by(Proxy.last_check_time).limit(settings.CONCURRENT_TASKS_COUNT)
214-
# )
215-
#
216-
# await self.add_proxies_to_queue(proxies)
217-
#
218-
# if len(proxies) > 0 or not self.is_queue_free():
219-
# continue
220-
#
221-
# # check dead proxies
222-
# proxies = await db.execute(
223-
# Proxy.select().where(
224-
# Proxy.number_of_bad_checks >= settings.DEAD_PROXY_THRESHOLD,
225-
# Proxy.number_of_bad_checks < settings.REMOVE_ON_N_BAD_CHECKS,
226-
# Proxy.last_check_time < time.time() - settings.DEAD_PROXY_CHECKING_PERIOD,
227-
# ).order_by(Proxy.last_check_time).limit(settings.CONCURRENT_TASKS_COUNT)
228-
# )
229-
#
230-
# await self.add_proxies_to_queue(proxies)
231-
# except KeyboardInterrupt as ex:
232-
# raise ex
233-
# except BaseException as ex:
234-
# self.logger.exception(ex)
235-
# if settings.DEBUG:
236-
# raise ex
237-
# await asyncio.sleep(10)
158+
return self.queue.qsize() < settings.NUMBER_OF_CONCURRENT_TASKS
238159

239160
async def add_proxy_to_queue(self, proxy: Proxy):
240161
await self.queue.put((
@@ -250,8 +171,6 @@ async def add_proxies_to_queue(self, proxies: list):
250171
await self.add_proxy_to_queue(proxy)
251172

252173
async def process_collector_of_state(self, collector_state):
253-
proxies = set()
254-
255174
collector = await collectors_list.load_collector(collector_state)
256175
try:
257176
self.logger.debug(
@@ -277,16 +196,15 @@ async def process_collector_of_state(self, collector_state):
277196
self.collectors_logger.exception(ex)
278197
finally:
279198
collector.last_processing_time = int(time.time())
280-
await collector.save_state(collector_state)
281-
# TODO: save new proxies count
282-
await db.update(collector_state)
199+
# TODO: new proxies count
200+
await collectors_list.save_collector(collector_state)
283201

284202
async def process_raw_proxies(self, proxies, collector_id):
285203
tasks = []
286204

287205
for proxy in proxies:
288206
tasks.append(self.process_raw_proxy(proxy, collector_id))
289-
if len(tasks) > settings.CONCURRENT_TASKS_COUNT:
207+
if len(tasks) > settings.NUMBER_OF_CONCURRENT_TASKS:
290208
await asyncio.gather(*tasks)
291209
tasks.clear()
292210

0 commit comments

Comments
 (0)