File size: 6,476 Bytes
ba5dcdc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
'''
WorkerPool and WorkerBase for handling the common problems in managing
a multiprocess pool of workers that aren't done by multiprocessing.Pool,
including setup with per-process state, debugging by putting the worker
on the main thread, and correct handling of unexpected errors, and ctrl-C.

To use it,
1. Put the per-process setup and the per-task work in the
   setup() and work() methods of your own WorkerBase subclass.
2. To prepare the process pool, instantiate a WorkerPool, passing your
   subclass type as the first (worker) argument, as well as any setup keyword
   arguments.  The WorkerPool will instantiate one of your workers in each
   worker process (passing in the setup arguments in those processes).
   If debugging, the pool can have process_count=0 to force all the work
   to be done immediately on the main thread; otherwise all the work
   will be passed to other processes.
3. Whenever there is a new piece of work to distribute, call pool.add(*args).
   The arguments will be queued and passed as worker.work(*args) to the
   next available worker.
4. When all the work has been distributed, call pool.join() to wait for all
   the work to complete and to finish and terminate all the worker processes.
   When pool.join() returns, all the work will have been done.

No arrangement is made to collect the results of the work: for example,
the return value of work() is ignored.  If you need to collect the
results, use your own mechanism (filesystem, shared memory object, queue)
which can be distributed using setup arguments.
'''

from multiprocessing import Process, Queue, cpu_count
import signal
import atexit
import sys

class WorkerBase(Process):
    '''
    Subclass this class and override its work() method (and optionally,
    setup() as well) to define the units of work to be done in a process
    worker in a woker pool.
    '''
    def __init__(self, i, process_count, queue, initargs):
        if process_count > 0:
            # Make sure we ignore ctrl-C if we are not on main process.
            signal.signal(signal.SIGINT, signal.SIG_IGN)
        self.process_id = i
        self.process_count = process_count
        self.queue = queue
        super(WorkerBase, self).__init__()
        self.setup(**initargs)
    def run(self):
        # Do the work until None is dequeued
        while True:
            try:
                work_batch = self.queue.get()
            except (KeyboardInterrupt, SystemExit):
                print('Exiting...')
                break
            if work_batch is None:
                self.queue.put(None) # for another worker
                return
            self.work(*work_batch)
    def setup(self, **initargs):
        '''
        Override this method for any per-process initialization.
        Keywoard args are passed from WorkerPool constructor.
        '''
        pass
    def work(self, *args):
        '''
        Override this method for one-time initialization.
        Args are passed from WorkerPool.add() arguments.
        '''
        raise NotImplementedError('worker subclass needed')

class WorkerPool(object):
    '''
    Instantiate this object (passing a WorkerBase subclass type
    as its first argument) to create a worker pool.  Then call
    pool.add(*args) to queue args to distribute to worker.work(*args),
    and call pool.join() to wait for all the workers to complete.
    '''
    def __init__(self, worker=WorkerBase, process_count=None, **initargs):
        global active_pools
        if process_count is None:
            process_count = cpu_count()
        if process_count == 0:
            # zero process_count uses only main process, for debugging.
            self.queue = None
            self.processes = None
            self.worker = worker(None, 0, None, initargs)
            return
        # Ctrl-C strategy: worker processes should ignore ctrl-C.  Set
        # this up to be inherited by child processes before forking.
        original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
        active_pools[id(self)] = self
        self.queue = Queue(maxsize=(process_count * 3))
        self.processes = None   # Initialize before trying to construct workers
        self.processes = [worker(i, process_count, self.queue, initargs)
                for i in range(process_count)]
        for p in self.processes:
            p.start()
        # The main process should handle ctrl-C.  Restore this now.
        signal.signal(signal.SIGINT, original_sigint_handler)
    def add(self, *work_batch):
        if self.queue is None:
            if hasattr(self, 'worker'):
                self.worker.work(*work_batch)
            else:
                print('WorkerPool shutting down.', file=sys.stderr)
        else:
            try:
                # The queue can block if the work is so slow it gets full.
                self.queue.put(work_batch)
            except (KeyboardInterrupt, SystemExit):
                # Handle ctrl-C if done while waiting for the queue.
                self.early_terminate()
    def join(self):
        # End the queue, and wait for all worker processes to complete nicely.
        if self.queue is not None:
            self.queue.put(None)
            for p in self.processes:
                p.join()
            self.queue = None
        # Remove myself from the set of pools that need cleanup on shutdown.
        try:
            del active_pools[id(self)]
        except:
            pass
    def early_terminate(self):
        # When shutting down unexpectedly, first end the queue.
        if self.queue is not None:
            try:
                self.queue.put_nowait(None)  # Nonblocking put throws if full.
                self.queue = None
            except:
                pass
        # But then don't wait: just forcibly terminate workers.
        if self.processes is not None:
            for p in self.processes:
                p.terminate()
            self.processes = None
        try:
            del active_pools[id(self)]
        except:
            pass
    def __del__(self):
        if self.queue is not None:
            print('ERROR: workerpool.join() not called!', file=sys.stderr)
            self.join()

# Error and ctrl-C handling: kill worker processes if the main process ends.
active_pools = {}
def early_terminate_pools():
    for _, pool in list(active_pools.items()):
        pool.early_terminate()

atexit.register(early_terminate_pools)