Skip to content

Commit f359bdf

Browse files
committed
Introduce ProcessPool
1 parent b0a6f14 commit f359bdf

File tree

3 files changed

+200
-76
lines changed

3 files changed

+200
-76
lines changed

fastcore/_modidx.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,11 +370,19 @@
370370
'fastcore.net.urlsend': ('net.html#urlsend', 'fastcore/net.py'),
371371
'fastcore.net.urlvalid': ('net.html#urlvalid', 'fastcore/net.py'),
372372
'fastcore.net.urlwrap': ('net.html#urlwrap', 'fastcore/net.py')},
373-
'fastcore.parallel': { 'fastcore.parallel.ProcessPoolExecutor': ('parallel.html#processpoolexecutor', 'fastcore/parallel.py'),
373+
'fastcore.parallel': { 'fastcore.parallel.NoDaemonProcess': ('parallel.html#nodaemonprocess', 'fastcore/parallel.py'),
374+
'fastcore.parallel.NoDaemonProcess.daemon': ( 'parallel.html#nodaemonprocess.daemon',
375+
'fastcore/parallel.py'),
376+
'fastcore.parallel.ProcessPool': ('parallel.html#processpool', 'fastcore/parallel.py'),
377+
'fastcore.parallel.ProcessPool.__init__': ('parallel.html#processpool.__init__', 'fastcore/parallel.py'),
378+
'fastcore.parallel.ProcessPool.map': ('parallel.html#processpool.map', 'fastcore/parallel.py'),
379+
'fastcore.parallel.ProcessPoolExecutor': ('parallel.html#processpoolexecutor', 'fastcore/parallel.py'),
374380
'fastcore.parallel.ProcessPoolExecutor.__init__': ( 'parallel.html#processpoolexecutor.__init__',
375381
'fastcore/parallel.py'),
376382
'fastcore.parallel.ProcessPoolExecutor.map': ( 'parallel.html#processpoolexecutor.map',
377383
'fastcore/parallel.py'),
384+
'fastcore.parallel.ThreadPool': ('parallel.html#threadpool', 'fastcore/parallel.py'),
385+
'fastcore.parallel.ThreadPool.__init__': ('parallel.html#threadpool.__init__', 'fastcore/parallel.py'),
378386
'fastcore.parallel.ThreadPoolExecutor': ('parallel.html#threadpoolexecutor', 'fastcore/parallel.py'),
379387
'fastcore.parallel.ThreadPoolExecutor.__init__': ( 'parallel.html#threadpoolexecutor.__init__',
380388
'fastcore/parallel.py'),

fastcore/parallel.py

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/03a_parallel.ipynb.
22

33
# %% auto 0
4-
__all__ = ['threaded', 'startthread', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'parallel', 'add_one',
5-
'run_procs', 'parallel_gen']
4+
__all__ = ['threaded', 'startthread', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'NoDaemonProcess',
5+
'ProcessPool', 'ThreadPool', 'parallel', 'add_one', 'run_procs', 'parallel_gen']
66

77
# %% ../nbs/03a_parallel.ipynb 1
88
from .imports import *
@@ -11,15 +11,14 @@
1111
from .meta import *
1212
from .xtras import *
1313
from functools import wraps
14-
1514
import concurrent.futures,time
16-
from multiprocessing import Process,Queue,Manager,set_start_method,get_all_start_methods,get_context
15+
from multiprocessing import pool,Process,Queue,Manager,set_start_method,get_all_start_methods,get_context
1716
from threading import Thread
1817
try:
1918
if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method("fork")
2019
except: pass
2120

22-
# %% ../nbs/03a_parallel.ipynb 4
21+
# %% ../nbs/03a_parallel.ipynb 5
2322
def threaded(f):
2423
"Run `f` in a thread, and returns the thread"
2524
@wraps(f)
@@ -29,12 +28,12 @@ def _f(*args, **kwargs):
2928
return res
3029
return _f
3130

32-
# %% ../nbs/03a_parallel.ipynb 6
31+
# %% ../nbs/03a_parallel.ipynb 7
3332
def startthread(f):
3433
"Like `threaded`, but start thread immediately"
3534
threaded(f)()
3635

37-
# %% ../nbs/03a_parallel.ipynb 8
36+
# %% ../nbs/03a_parallel.ipynb 9
3837
def _call(lock, pause, n, g, item):
3938
l = False
4039
if pause:
@@ -45,7 +44,7 @@ def _call(lock, pause, n, g, item):
4544
if l: lock.release()
4645
return g(item)
4746

48-
# %% ../nbs/03a_parallel.ipynb 9
47+
# %% ../nbs/03a_parallel.ipynb 10
4948
def parallelable(param_name, num_workers, f=None):
5049
f_in_main = f == None or sys.modules[f.__module__].__name__ == "__main__"
5150
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0 and f_in_main:
@@ -54,7 +53,7 @@ def parallelable(param_name, num_workers, f=None):
5453
return False
5554
return True
5655

57-
# %% ../nbs/03a_parallel.ipynb 10
56+
# %% ../nbs/03a_parallel.ipynb 11
5857
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
5958
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
6059
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
@@ -72,7 +71,7 @@ def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
7271
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
7372
except Exception as e: self.on_exc(e)
7473

75-
# %% ../nbs/03a_parallel.ipynb 12
74+
# %% ../nbs/03a_parallel.ipynb 13
7675
@delegates()
7776
class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
7877
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
@@ -95,49 +94,101 @@ def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
9594
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
9695
except Exception as e: self.on_exc(e)
9796

98-
# %% ../nbs/03a_parallel.ipynb 14
97+
# %% ../nbs/03a_parallel.ipynb 15
98+
class NoDaemonProcess(Process):
99+
# See https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
100+
@property
101+
def daemon(self):
102+
return False
103+
@daemon.setter
104+
def daemon(self, value):
105+
pass
106+
107+
# %% ../nbs/03a_parallel.ipynb 16
108+
@delegates()
109+
class ProcessPool(pool.Pool):
110+
"Same as Python's Pool, except can pass `max_workers==0` for serial execution"
111+
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, daemonic=False, **kwargs):
112+
if max_workers is None: max_workers=defaults.cpus
113+
store_attr()
114+
self.not_parallel = max_workers==0
115+
if self.not_parallel: max_workers=1
116+
if not daemonic:
117+
class NoDaemonContext(type(kwargs.get('context', get_context()))):
118+
Process = NoDaemonProcess
119+
kwargs['context'] = NoDaemonContext()
120+
super().__init__(max_workers, **kwargs)
121+
122+
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
123+
assert timeout is None, "timeout is not supported by ProcessPool, use ProcessPoolExecutor instead"
124+
if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0
125+
self.not_parallel = self.max_workers==0
126+
if self.not_parallel: self.max_workers=1
127+
128+
if self.not_parallel == False: self.lock = Manager().Lock()
129+
g = partial(f, *args, **kwargs)
130+
if self.not_parallel: return map(g, items)
131+
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
132+
try: return super().map(_g, items, chunksize=chunksize)
133+
except Exception as e: self.on_exc(e)
134+
135+
# %% ../nbs/03a_parallel.ipynb 17
136+
@delegates()
137+
class ThreadPool():
138+
# If you have a need for a ThreadPool, please open an issue.
139+
def __init__(self, *args, **kwargs):
140+
raise NotImplementedError("`ThreadPool` is not implemented")
141+
142+
# %% ../nbs/03a_parallel.ipynb 18
99143
try: from fastprogress import progress_bar
100144
except: progress_bar = None
101145

102-
# %% ../nbs/03a_parallel.ipynb 15
146+
# %% ../nbs/03a_parallel.ipynb 19
103147
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
104-
method=None, threadpool=False, timeout=None, chunksize=1, **kwargs):
148+
method=None, threadpool=False, timeout=None, chunksize=1,
149+
executor=True, maxtasksperchild=None, **kwargs):
105150
"Applies `func` in parallel to `items`, using `n_workers`"
106151
kwpool = {}
107-
if threadpool: pool = ThreadPoolExecutor
152+
if threadpool: pool = ThreadPoolExecutor if executor else ThreadPool
108153
else:
154+
pool = ProcessPoolExecutor if executor else ProcessPool
109155
if not method and sys.platform == 'darwin': method='fork'
110-
if method: kwpool['mp_context'] = get_context(method)
111-
pool = ProcessPoolExecutor
156+
if method:
157+
if executor: kwpool['mp_context'] = get_context(method)
158+
else: kwpool['context'] = get_context(method)
159+
160+
if maxtasksperchild:
161+
assert pool==ProcessPool, "`maxtasksperchild` is only supported by ProcessPool"
162+
kwpool['maxtasksperchild'] = maxtasksperchild
112163
with pool(n_workers, pause=pause, **kwpool) as ex:
113164
r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)
114165
if progress and progress_bar:
115166
if total is None: total = len(items)
116167
r = progress_bar(r, total=total, leave=False)
117168
return L(r)
118169

119-
# %% ../nbs/03a_parallel.ipynb 16
170+
# %% ../nbs/03a_parallel.ipynb 20
120171
def add_one(x, a=1):
121172
# this import is necessary for multiprocessing in notebook on windows
122173
import random
123174
time.sleep(random.random()/80)
124175
return x+a
125176

126-
# %% ../nbs/03a_parallel.ipynb 22
177+
# %% ../nbs/03a_parallel.ipynb 26
127178
def run_procs(f, f_done, args):
128179
"Call `f` for each item in `args` in parallel, yielding `f_done`"
129180
processes = L(args).map(Process, args=arg0, target=f)
130181
for o in processes: o.start()
131182
yield from f_done()
132183
processes.map(Self.join())
133184

134-
# %% ../nbs/03a_parallel.ipynb 23
185+
# %% ../nbs/03a_parallel.ipynb 27
135186
def _f_pg(obj, queue, batch, start_idx):
136187
for i,b in enumerate(obj(batch)): queue.put((start_idx+i,b))
137188

138189
def _done_pg(queue, items): return (queue.get() for _ in items)
139190

140-
# %% ../nbs/03a_parallel.ipynb 24
191+
# %% ../nbs/03a_parallel.ipynb 28
141192
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
142193
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
143194
if not parallelable('n_workers', n_workers): n_workers = 0

0 commit comments

Comments
 (0)