Реализация специального типа многопроцессорной очереди в Python - PullRequest
5 голосов
/ 23 июня 2010

Представьте себе перевернутое двоичное дерево с узлами A, B, C, D, E, F на уровне 0. Узлы G, H, I на уровне 1, узел J на ​​уровне 2 и узел K на уровне 3.

Уровень 1: G = func (A, B), H = func (C, D), I = func (E, F)

Уровень 2: J = func (G, H)

Уровень 3: K = func (J, I).

Каждая пара узлов на уровне 0 должна обрабатываться по порядку. Каждая пара узлов на уровне 1 может обрабатываться в любом порядке, но результат на следующем уровне должен обрабатываться, как показано, и так далее домы получаем конечный результат, K.

Актуальная проблема - это проблема вычислительной геометрии, в которой последовательность твердых тел сливается вместе.A смежен с B, который смежен с C, и так далее.Получающийся плавкий предохранитель A и B (G) смежен с плавким предохранителем C и D (H).Получившийся предохранитель J и I (K) является окончательным результатом.Таким образом, вы не можете соединить G и I, поскольку они не соседствуют.Если число узлов на уровне не является степенью 2, вы получите висячую сущность, которая должна быть обработана на один уровень дальше.

Поскольку процесс плавкого предохранения является вычислительно дорогим и требует большого объема памяти, но очень параллельным,Я хотел бы использовать многопроцессорный пакет Python и некоторую форму очереди.После вычисления G = func (A, B) я бы хотел поместить результат G в очередь для последующего вычисления J = func (G, H).Когда очередь пуста, последний результат является окончательным результатом.Имейте в виду, что mp.queue не обязательно будет давать результаты FIFO, так как I = func (E, F) может закончиться до того, как H = func (C, D)

Я придумал несколько (плохо) решения, но я уверен, что есть элегантное решение, которое мне не по карману.Предложения?

1 Ответ

0 голосов
/ 29 июня 2010

Я не смог придумать умный дизайн для очереди, но вы можете легко заменить очередь еще одним процессом, который в моем примере я назвал WorkerManager.Этот процесс собирает результаты всех процессов Worker и запускает новых рабочих, только если есть два смежных пакета данных, ожидающих обработки.Таким образом, вы никогда не будете пытаться объединять несмежные результаты, поэтому вы можете игнорировать «уровни» и запускать вычисления следующей пары, как только она будет готова.

from multiprocessing import Process, Queue

class Result(object):
    '''Result from start to end.'''
    def __init__(self, start, end, data):
        self.start = start
        self.end = end
        self.data = data


class Worker(Process):
    '''Joins two results into one result.'''
    def __init__(self, result_queue, pair):
        self.result_queue = result_queue
        self.pair = pair
        super(Worker, self).__init__()

    def run(self):
        left, right = self.pair
        result = Result(left.start, right.end,
                        '(%s, %s)' % (left.data, right.data))
        self.result_queue.put(result)


class WorkerManager(Process):
    '''
    Takes results from result_queue, pairs them
    and assigns workers to process them.
    Returns final result into final_queue.
    '''
    def __init__(self, result_queue, final_queue, start, end):
        self._result_queue = result_queue
        self._final_queue = final_queue
        self._start = start
        self._end = end
        self._results = []
        super(WorkerManager, self).__init__()

    def run(self):
        while True:
            result = self._result_queue.get()
            self._add_result(result)
            if self._has_final_result():
                self._final_queue.put(self._get_final_result())
                return
            pair = self._find_adjacent_pair()
            if pair:
                self._start_worker(pair)

    def _add_result(self, result):
        self._results.append(result)
        self._results.sort(key=lambda result: result.start)

    def _has_final_result(self):
        return (len(self._results) == 1
                and self._results[0].start == self._start
                and self._results[0].end == self._end)

    def _get_final_result(self):
        return self._results[0]

    def _find_adjacent_pair(self):
        for i in xrange(len(self._results) - 1):
            left, right = self._results[i], self._results[i + 1]
            if left.end == right.start:
                self._results = self._results[:i] + self._results[i + 2:]
                return left, right

    def _start_worker(self, pair):
        worker = Worker(self._result_queue, pair)
        worker.start()

if __name__ == '__main__':
    DATA = [Result(i, i + 1, str(i)) for i in xrange(6)]
    result_queue = Queue()
    final_queue = Queue()
    start = 0
    end = len(DATA)
    man = WorkerManager(result_queue, final_queue, start, end)
    man.start()
    for res in DATA:
        result_queue.put(res)
    final = final_queue.get()
    print final.start
    # 0
    print final.end
    # 6
    print final.data
    # For example:
    # (((0, 1), (2, 3)), (4, 5))

Для моего примера я использовалпростой Worker, который возвращает данные в круглых скобках, разделенных запятой, но вы можете поместить туда любое вычисление.В моем случае конечный результат был (((0, 1), (2, 3)), (4, 5)), что означает, что алгоритм вычислил (0, 1) и (2, 3) до вычисления ((0, 1), (2, 3)), а затем соединил результат с (4, 5).Я надеюсь, что это то, что вы искали.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...