Skip to content

Commit 93d6951

Browse files
committed
some refactoring
1 parent aa00c58 commit 93d6951

File tree

4 files changed

+19
-53
lines changed

4 files changed

+19
-53
lines changed

TODO

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ p2p
1313
log memory consumption
1414
stop using ipinfo.io because of rate limits
1515
fix leak of something, again :(
16+
remove processor proxies queue model

processor.py

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -66,40 +66,15 @@ def __init__(self):
6666

6767
self.logger.debug("processor initialization...")
6868

69-
self.queue = asyncio.Queue(maxsize=settings.PROXY_QUEUE_SIZE)
7069
self.proxies_semaphore = asyncio.BoundedSemaphore(settings.NUMBER_OF_CONCURRENT_TASKS)
7170
self.good_proxies_are_processed = False
7271

7372
async def worker(self):
7473
await asyncio.gather(*[
75-
self.producer(),
76-
self.consumer(),
74+
self.process_proxies(),
75+
self.process_collectors(),
7776
])
7877

79-
async def consumer(self):
80-
while True:
81-
await asyncio.sleep(0.00001)
82-
83-
try:
84-
if not self.proxies_semaphore.locked():
85-
asyncio.ensure_future(self.process_proxy(
86-
*(await self.queue.get())
87-
))
88-
except KeyboardInterrupt:
89-
raise
90-
except BaseException as ex:
91-
self.logger.exception(ex)
92-
if settings.DEBUG:
93-
raise ex
94-
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
95-
96-
async def producer(self):
97-
while True:
98-
await asyncio.gather(*[
99-
self.process_proxies(),
100-
self.process_collectors(),
101-
])
102-
10378
async def process_proxies(self):
10479
while True:
10580
await asyncio.sleep(0.00001)
@@ -127,7 +102,7 @@ async def process_proxies(self):
127102
Proxy.number_of_bad_checks > 0,
128103
Proxy.number_of_bad_checks < settings.DEAD_PROXY_THRESHOLD,
129104
Proxy.last_check_time < time.time() - settings.BAD_PROXY_CHECKING_PERIOD,
130-
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
105+
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
131106
)
132107

133108
await self.add_proxies_to_queue(proxies)
@@ -141,7 +116,7 @@ async def process_proxies(self):
141116
Proxy.number_of_bad_checks >= settings.DEAD_PROXY_THRESHOLD,
142117
Proxy.number_of_bad_checks < settings.DO_NOT_CHECK_ON_N_BAD_CHECKS,
143118
Proxy.last_check_time < time.time() - settings.DEAD_PROXY_CHECKING_PERIOD,
144-
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
119+
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
145120
)
146121

147122
await self.add_proxies_to_queue(proxies)
@@ -179,16 +154,16 @@ async def process_collectors(self):
179154

180155
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
181156

182-
def is_queue_free(self):
183-
return self.queue.qsize() < settings.NUMBER_OF_CONCURRENT_TASKS
157+
async def add_proxy_to_queue(self, proxy: Proxy, collector_id=None):
158+
while self.proxies_semaphore.locked():
159+
await asyncio.sleep(0.001)
184160

185-
async def add_proxy_to_queue(self, proxy: Proxy):
186-
await self.queue.put((
161+
asyncio.ensure_future(self.process_proxy(
187162
proxy.get_raw_protocol(),
188163
proxy.auth_data,
189164
proxy.domain,
190165
proxy.port,
191-
None
166+
collector_id,
192167
))
193168

194169
async def add_proxies_to_queue(self, proxies: list):
@@ -285,13 +260,13 @@ async def process_raw_proxy(self, proxy, collector_id):
285260
while not self.good_proxies_are_processed:
286261
await asyncio.sleep(0.01)
287262

288-
await self.queue.put((
289-
raw_protocol,
290-
auth_data,
291-
domain,
292-
port,
293-
collector_id,
294-
))
263+
new_proxy = Proxy()
264+
new_proxy.raw_protocol = raw_protocol
265+
new_proxy.auth_data = auth_data
266+
new_proxy.domain = domain
267+
new_proxy.port = port
268+
269+
self.add_proxy_to_queue(new_proxy, collector_id)
295270

296271
async def process_proxy(self, raw_protocol: int, auth_data: str, domain: str, port: int, collector_id):
297272
async with self.proxies_semaphore:

proxy_py/_settings.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,13 @@
3232
# 'local/collectors', # use to add your own collectors
3333
]
3434

35-
NUMBER_OF_CONCURRENT_TASKS = 512
35+
NUMBER_OF_CONCURRENT_TASKS = 128
3636
# makes aiohttp to not send more
3737
# than this number of simultaneous requests
3838
# works by common connector
39-
NUMBER_OF_SIMULTANEOUS_REQUESTS = 256
39+
NUMBER_OF_SIMULTANEOUS_REQUESTS = 64
4040
# the same, but per host
4141
NUMBER_OF_SIMULTANEOUS_REQUESTS_PER_HOST = NUMBER_OF_SIMULTANEOUS_REQUESTS
42-
PROXY_QUEUE_SIZE = NUMBER_OF_CONCURRENT_TASKS * 2
4342

4443
MIN_PROXY_CHECKING_PERIOD = 10 * 60
4544
MAX_PROXY_CHECKING_PERIOD = 60 * 60

statistics/statistics.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ async def worker():
1111
await process_graph(ProxyCountItem, 60, create_proxy_count_item)
1212
await process_graph(NumberOfProxiesToProcess, 60, number_of_proxies_to_process)
1313
await process_graph(NumberOfCollectorsToProcess, 60, number_of_collectors_to_process)
14-
await process_graph(ProcessorProxiesQueueSize, 60, processor_proxies_queue_size)
1514
await asyncio.sleep(10)
1615

1716

@@ -98,11 +97,3 @@ async def number_of_collectors_to_process(timestamp):
9897
timestamp=timestamp,
9998
value=number_of_collectors,
10099
)
101-
102-
103-
async def processor_proxies_queue_size(timestamp):
104-
await db.create(
105-
ProcessorProxiesQueueSize,
106-
timestamp=timestamp,
107-
value=Processor.get_instance().queue.qsize(),
108-
)

0 commit comments

Comments
 (0)