Я не смог придумать умный дизайн для очереди, но вы можете легко заменить очередь еще одним процессом, который в моем примере я назвал WorkerManager
.Этот процесс собирает результаты всех процессов Worker
и запускает новых рабочих, только если есть два смежных пакета данных, ожидающих обработки.Таким образом, вы никогда не будете пытаться объединять несмежные результаты, поэтому вы можете игнорировать «уровни» и запускать вычисления следующей пары, как только она будет готова.
from multiprocessing import Process, Queue
class Result(object):
'''Result from start to end.'''
def __init__(self, start, end, data):
self.start = start
self.end = end
self.data = data
class Worker(Process):
'''Joins two results into one result.'''
def __init__(self, result_queue, pair):
self.result_queue = result_queue
self.pair = pair
super(Worker, self).__init__()
def run(self):
left, right = self.pair
result = Result(left.start, right.end,
'(%s, %s)' % (left.data, right.data))
self.result_queue.put(result)
class WorkerManager(Process):
'''
Takes results from result_queue, pairs them
and assigns workers to process them.
Returns final result into final_queue.
'''
def __init__(self, result_queue, final_queue, start, end):
self._result_queue = result_queue
self._final_queue = final_queue
self._start = start
self._end = end
self._results = []
super(WorkerManager, self).__init__()
def run(self):
while True:
result = self._result_queue.get()
self._add_result(result)
if self._has_final_result():
self._final_queue.put(self._get_final_result())
return
pair = self._find_adjacent_pair()
if pair:
self._start_worker(pair)
def _add_result(self, result):
self._results.append(result)
self._results.sort(key=lambda result: result.start)
def _has_final_result(self):
return (len(self._results) == 1
and self._results[0].start == self._start
and self._results[0].end == self._end)
def _get_final_result(self):
return self._results[0]
def _find_adjacent_pair(self):
for i in xrange(len(self._results) - 1):
left, right = self._results[i], self._results[i + 1]
if left.end == right.start:
self._results = self._results[:i] + self._results[i + 2:]
return left, right
def _start_worker(self, pair):
worker = Worker(self._result_queue, pair)
worker.start()
if __name__ == '__main__':
DATA = [Result(i, i + 1, str(i)) for i in xrange(6)]
result_queue = Queue()
final_queue = Queue()
start = 0
end = len(DATA)
man = WorkerManager(result_queue, final_queue, start, end)
man.start()
for res in DATA:
result_queue.put(res)
final = final_queue.get()
print final.start
# 0
print final.end
# 6
print final.data
# For example:
# (((0, 1), (2, 3)), (4, 5))
Для моего примера я использовалпростой Worker
, который возвращает данные в круглых скобках, разделенных запятой, но вы можете поместить туда любое вычисление.В моем случае конечный результат был (((0, 1), (2, 3)), (4, 5))
, что означает, что алгоритм вычислил (0, 1)
и (2, 3)
до вычисления ((0, 1), (2, 3))
, а затем соединил результат с (4, 5)
.Я надеюсь, что это то, что вы искали.