Skip to content

Commit 625d64b

Browse files
committed
fix collectors\' stuck issue
1 parent cd4ba3b commit 625d64b

File tree

1 file changed

+46
-22
lines changed

1 file changed

+46
-22
lines changed

processor.py

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def __init__(self):
6868

6969
self.queue = asyncio.Queue(maxsize=settings.PROXY_QUEUE_SIZE)
7070
self.proxies_semaphore = asyncio.BoundedSemaphore(settings.NUMBER_OF_CONCURRENT_TASKS)
71+
self.good_proxies_are_processed = False
7172

7273
async def worker(self):
7374
await asyncio.gather(*[
@@ -93,6 +94,13 @@ async def consumer(self):
9394
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
9495

9596
async def producer(self):
97+
while True:
98+
await asyncio.gather(*[
99+
self.process_proxies(),
100+
self.process_collectors(),
101+
])
102+
103+
async def process_proxies(self):
96104
while True:
97105
await asyncio.sleep(0.00001)
98106
try:
@@ -103,28 +111,15 @@ async def producer(self):
103111
Proxy.last_check_time < time.time() - Proxy.checking_period,
104112
).order_by(Proxy.last_check_time).limit(settings.NUMBER_OF_CONCURRENT_TASKS)
105113
)
114+
if proxies:
115+
self.good_proxies_are_processed = False
106116

107117
await self.add_proxies_to_queue(proxies)
108118

109-
if len(proxies) > settings.NUMBER_OF_CONCURRENT_TASKS / 2:
119+
if proxies:
110120
continue
111121

112-
# check collectors
113-
collector_states = await db.execute(
114-
CollectorState.select().where(
115-
CollectorState.last_processing_time < time.time() - CollectorState.processing_period
116-
).order_by(peewee.fn.Random()).
117-
limit(settings.NUMBER_OF_CONCURRENT_COLLECTORS)
118-
)
119-
120-
tasks = [
121-
self.process_collector_of_state(collector_state)
122-
for collector_state in collector_states
123-
]
124-
125-
if tasks:
126-
await asyncio.gather(*tasks)
127-
continue
122+
self.good_proxies_are_processed = True
128123

129124
# check bad proxies
130125
proxies = await db.execute(
@@ -159,6 +154,31 @@ async def producer(self):
159154

160155
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
161156

157+
async def process_collectors(self):
158+
while True:
159+
try:
160+
await asyncio.sleep(0.000001)
161+
162+
# check collectors
163+
collector_states = await db.execute(
164+
CollectorState.select().where(
165+
CollectorState.last_processing_time < time.time() - CollectorState.processing_period
166+
).order_by(peewee.fn.Random()).limit(settings.NUMBER_OF_CONCURRENT_COLLECTORS)
167+
)
168+
169+
await asyncio.gather(*[
170+
self.process_collector_of_state(collector_state)
171+
for collector_state in collector_states
172+
])
173+
except KeyboardInterrupt as ex:
174+
raise ex
175+
except BaseException as ex:
176+
self.collectors_logger.exception(ex)
177+
if settings.DEBUG:
178+
raise ex
179+
180+
await asyncio.sleep(settings.SLEEP_AFTER_ERROR_PERIOD)
181+
162182
def is_queue_free(self):
163183
return self.queue.qsize() < settings.NUMBER_OF_CONCURRENT_TASKS
164184

@@ -183,15 +203,15 @@ async def process_collector_of_state(self, collector_state):
183203
)
184204
proxies = await collector._collect()
185205

186-
if not proxies:
187-
self.collectors_logger.warning(
188-
"got 0 proxies from collector of type \"{}\"".format(type(collector))
189-
)
190-
else:
206+
if proxies:
191207
self.logger.debug(
192208
"got {} proxies from collector of type \"{}\"".format(len(proxies), type(collector))
193209
)
194210
await self.process_raw_proxies(proxies, collector_state.id)
211+
else:
212+
self.collectors_logger.warning(
213+
"got 0 proxies from collector of type \"{}\"".format(type(collector))
214+
)
195215
except KeyboardInterrupt as ex:
196216
raise ex
197217
except BaseException as ex:
@@ -208,6 +228,7 @@ async def process_raw_proxies(self, proxies, collector_id):
208228
tasks = []
209229

210230
for proxy in proxies:
231+
# TODO: refactor it
211232
tasks.append(self.process_raw_proxy(proxy, collector_id))
212233
if len(tasks) > settings.NUMBER_OF_CONCURRENT_TASKS:
213234
await asyncio.gather(*tasks)
@@ -261,6 +282,9 @@ async def process_raw_proxy(self, proxy, collector_id):
261282
pass
262283

263284
for raw_protocol in range(len(Proxy.PROTOCOLS)):
285+
while not self.good_proxies_are_processed:
286+
await asyncio.sleep(0.01)
287+
264288
await self.queue.put((
265289
raw_protocol,
266290
auth_data,

0 commit comments

Comments
 (0)