Skip to content

Commit 20cb995

Browse files
authored
Handle broken process pool in diffing server well (#38)
We previously tried to reset the process pool that runs the actual diff routines every time it broke, and failed to handle the case where we exhausted all our retries (we'd wind up returning `None`, which is not an acceptable result). We now don't reset the pool and instead raise an exception on the last try, preventing weird errors from happening later on because we returned a bad value (`None`) from `DiffHandler.diff()`. This also does a little more work to ensure that we aren't thrashing the process pool when a lot of diffs are happening simultaneously. We previously had each diff blindly reset the pool, which means that if multiple diffs were in flight when the pool broke, it would get reset multiple times, even though it really only needed to get reset once. This was probably wasting a lot of memory and CPU time. Note that the tests here don't really test the first issue. There's not a good way to differentiate from the outside whether an unknown error was the process pool or something else. Fixes #33.
1 parent 6a02441 commit 20cb995

File tree

3 files changed

+104
-1
lines changed

3 files changed

+104
-1
lines changed

docs/source/release-history.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
Release History
33
===============
44

5+
In Development
6+
--------------
7+
8+
- Fixes an issue where the diffing server could reset the process pool that manages the actual diffs multiple times unnecessarily, leading to wasted memory and CPU. If you are tracking logs and errors, this will also make error messages about the diffing server clearer — you’ll see “BrokenProcessPool” instead of “'NoneType' object does not support item assignment.” (`#38 <https://github.com/edgi-govdata-archiving/web-monitoring-diff/issues/38>`_)
9+
10+
511
Version 0.1.0
612
-------------
713

web_monitoring_diff/server/server.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,16 @@ async def diff(self, func, a, b, params, tries=2):
498498
return await loop.run_in_executor(
499499
executor, functools.partial(caller, func, a, b, **params))
500500
except concurrent.futures.process.BrokenProcessPool:
501-
executor = self.get_diff_executor(reset=True)
501+
if attempt + 1 < tries:
502+
# There could be many diffs happening in parallel, so
503+
# before trying to reset the process pool, make sure other
504+
# parallel diffs haven't already done it. If it's already
505+
# been reset, then we can just go and use the new one.
506+
old_executor, executor = executor, self.get_diff_executor()
507+
if executor == old_executor:
508+
executor = self.get_diff_executor(reset=True)
509+
else:
510+
raise
502511

503512
# NOTE: this doesn't do anything async, but if we change it to do so, we
504513
# need to add a lock (either asyncio.Lock or tornado.locks.Lock).

web_monitoring_diff/tests/test_server_exc_handling.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
import concurrent.futures
3+
from concurrent.futures.process import BrokenProcessPool, ProcessPoolExecutor
24
import json
35
import os
46
import unittest
@@ -387,6 +389,92 @@ async def responder(handler):
387389
assert len(result['diff'][0][1]) == 1024
388390

389391

392+
class BrokenProcessPoolExecutor(concurrent.futures.Executor):
393+
"Fake process pool that only raises BrokenProcessPool exceptions."
394+
submit_count = 0
395+
396+
def submit(self, fn, *args, **kwargs):
397+
self.submit_count += 1
398+
result = concurrent.futures.Future()
399+
result.set_exception(BrokenProcessPool(
400+
'This pool is broken, yo'
401+
))
402+
return result
403+
404+
405+
class ExecutionPoolTestCase(DiffingServerTestCase):
406+
def fetch_async(self, path, **kwargs):
407+
"Like AyncHTTPTestCase.fetch, but async."
408+
url = self.get_url(path)
409+
return self.http_client.fetch(url, **kwargs)
410+
411+
def test_rebuilds_process_pool_when_broken(self):
412+
# Get a custom executor that will always fail the first time, but get
413+
# a real one that will succeed afterward.
414+
did_get_executor = False
415+
def get_executor(self, reset=False):
416+
nonlocal did_get_executor
417+
if did_get_executor:
418+
return ProcessPoolExecutor(1)
419+
else:
420+
did_get_executor = True
421+
return BrokenProcessPoolExecutor()
422+
423+
with patch.object(df.DiffHandler, 'get_diff_executor', get_executor):
424+
response = self.fetch('/html_source_dmp?format=json&'
425+
f'a=file://{fixture_path("empty.txt")}&'
426+
f'b=file://{fixture_path("empty.txt")}')
427+
assert response.code == 200
428+
assert did_get_executor == True
429+
430+
def test_diff_returns_error_if_process_pool_repeatedly_breaks(self):
431+
# Set a custom executor that will always fail.
432+
def get_executor(self, reset=False):
433+
return BrokenProcessPoolExecutor()
434+
435+
with patch.object(df.DiffHandler, 'get_diff_executor', get_executor):
436+
response = self.fetch('/html_source_dmp?format=json&'
437+
f'a=file://{fixture_path("empty.txt")}&'
438+
f'b=file://{fixture_path("empty.txt")}')
439+
self.json_check(response)
440+
assert response.code == 500
441+
442+
@tornado.testing.gen_test
443+
async def test_rebuilds_process_pool_cooperatively(self):
444+
"""
445+
Make sure that two parallel diffing failures only cause the process
446+
pool to be rebuilt once, not multiple times.
447+
"""
448+
# Get a custom executor that will always fail the first time, but get
449+
# a real one that will succeed afterward.
450+
executor_resets = 0
451+
good_executor = ProcessPoolExecutor(1)
452+
bad_executor = BrokenProcessPoolExecutor()
453+
def get_executor(self, reset=False):
454+
nonlocal executor_resets
455+
if reset:
456+
executor_resets += 1
457+
if executor_resets > 0:
458+
return good_executor
459+
else:
460+
return bad_executor
461+
462+
with patch.object(df.DiffHandler, 'get_diff_executor', get_executor):
463+
one = self.fetch_async('/html_source_dmp?format=json&'
464+
f'a=file://{fixture_path("empty.txt")}&'
465+
f'b=file://{fixture_path("empty.txt")}')
466+
two = self.fetch_async('/html_source_dmp?format=json&'
467+
f'a=file://{fixture_path("empty.txt")}&'
468+
f'b=file://{fixture_path("empty.txt")}')
469+
response1, response2 = await asyncio.gather(one, two)
470+
assert response1.code == 200
471+
assert response2.code == 200
472+
assert executor_resets == 1
473+
# Ensure *both* diffs hit the bad executor, so we know we didn't
474+
# have one reset because only one request hit the bad executor.
475+
assert bad_executor.submit_count == 2
476+
477+
390478
def mock_diffing_method(c_body):
391479
return
392480

0 commit comments

Comments
 (0)