2Much2Code:)
commit
ba5dcdc
'''
WorkerPool and WorkerBase for handling the common problems in managing
a multiprocess pool of workers that aren't done by multiprocessing.Pool,
including setup with per-process state, debugging by putting the worker
on the main thread, and correct handling of unexpected errors, and ctrl-C.
To use it,
1. Put the per-process setup and the per-task work in the
setup() and work() methods of your own WorkerBase subclass.
2. To prepare the process pool, instantiate a WorkerPool, passing your
subclass type as the first (worker) argument, as well as any setup keyword
arguments. The WorkerPool will instantiate one of your workers in each
worker process (passing in the setup arguments in those processes).
If debugging, the pool can have process_count=0 to force all the work
to be done immediately on the main thread; otherwise all the work
will be passed to other processes.
3. Whenever there is a new piece of work to distribute, call pool.add(*args).
The arguments will be queued and passed as worker.work(*args) to the
next available worker.
4. When all the work has been distributed, call pool.join() to wait for all
the work to complete and to finish and terminate all the worker processes.
When pool.join() returns, all the work will have been done.
No arrangement is made to collect the results of the work: for example,
the return value of work() is ignored. If you need to collect the
results, use your own mechanism (filesystem, shared memory object, queue)
which can be distributed using setup arguments.
'''
from multiprocessing import Process, Queue, cpu_count
import signal
import atexit
import sys
class WorkerBase(Process):
'''
Subclass this class and override its work() method (and optionally,
setup() as well) to define the units of work to be done in a process
worker in a woker pool.
'''
def __init__(self, i, process_count, queue, initargs):
if process_count > 0:
# Make sure we ignore ctrl-C if we are not on main process.
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.process_id = i
self.process_count = process_count
self.queue = queue
super(WorkerBase, self).__init__()
self.setup(**initargs)
def run(self):
# Do the work until None is dequeued
while True:
try:
work_batch = self.queue.get()
except (KeyboardInterrupt, SystemExit):
print('Exiting...')
break
if work_batch is None:
self.queue.put(None) # for another worker
return
self.work(*work_batch)
def setup(self, **initargs):
'''
Override this method for any per-process initialization.
Keywoard args are passed from WorkerPool constructor.
'''
pass
def work(self, *args):
'''
Override this method for one-time initialization.
Args are passed from WorkerPool.add() arguments.
'''
raise NotImplementedError('worker subclass needed')
class WorkerPool(object):
'''
Instantiate this object (passing a WorkerBase subclass type
as its first argument) to create a worker pool. Then call
pool.add(*args) to queue args to distribute to worker.work(*args),
and call pool.join() to wait for all the workers to complete.
'''
def __init__(self, worker=WorkerBase, process_count=None, **initargs):
global active_pools
if process_count is None:
process_count = cpu_count()
if process_count == 0:
# zero process_count uses only main process, for debugging.
self.queue = None
self.processes = None
self.worker = worker(None, 0, None, initargs)
return
# Ctrl-C strategy: worker processes should ignore ctrl-C. Set
# this up to be inherited by child processes before forking.
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
active_pools[id(self)] = self
self.queue = Queue(maxsize=(process_count * 3))
self.processes = None # Initialize before trying to construct workers
self.processes = [worker(i, process_count, self.queue, initargs)
for i in range(process_count)]
for p in self.processes:
p.start()
# The main process should handle ctrl-C. Restore this now.
signal.signal(signal.SIGINT, original_sigint_handler)
def add(self, *work_batch):
if self.queue is None:
if hasattr(self, 'worker'):
self.worker.work(*work_batch)
else:
print('WorkerPool shutting down.', file=sys.stderr)
else:
try:
# The queue can block if the work is so slow it gets full.
self.queue.put(work_batch)
except (KeyboardInterrupt, SystemExit):
# Handle ctrl-C if done while waiting for the queue.
self.early_terminate()
def join(self):
# End the queue, and wait for all worker processes to complete nicely.
if self.queue is not None:
self.queue.put(None)
for p in self.processes:
p.join()
self.queue = None
# Remove myself from the set of pools that need cleanup on shutdown.
try:
del active_pools[id(self)]
except:
pass
def early_terminate(self):
# When shutting down unexpectedly, first end the queue.
if self.queue is not None:
try:
self.queue.put_nowait(None) # Nonblocking put throws if full.
self.queue = None
except:
pass
# But then don't wait: just forcibly terminate workers.
if self.processes is not None:
for p in self.processes:
p.terminate()
self.processes = None
try:
del active_pools[id(self)]
except:
pass
def __del__(self):
if self.queue is not None:
print('ERROR: workerpool.join() not called!', file=sys.stderr)
self.join()
# Error and ctrl-C handling: kill worker processes if the main process ends.
active_pools = {}
def early_terminate_pools():
for _, pool in list(active_pools.items()):
pool.early_terminate()
atexit.register(early_terminate_pools)