Как Pythonic отображает с помощью multiprocessing.Pool при добавлении в итератор? - PullRequest
3 голосов
/ 03 ноября 2019

Кажется распространенной схемой использования очереди с пулом процессов, например

Pool(2).map(f, xs)

, но когда тело f может добавляться к отображаемым элементам, например,

from multiprocessing import Pool

xs = [0]

def f(n):
    global xs
    if n < 10:
        xs.append(n + 1)
    return n

Pool(2).map(f, xs)

Ожидается возвращение [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Я понимаю, что можно создать этот « вручную » с помощью примитивов, предоставленных mt, но это кажется достаточно распространенным шаблоном, который должен бытьобщее решение. Вы знаете одного?

Ответы [ 2 ]

0 голосов
/ 04 ноября 2019

Вы можете создать класс, который делает это из примитивов, таких как:

from multiprocessing import JoinableQueue, Process


class PoolQueue(object):
    def __init__(self, n):
        self.num_procs = n

    def map(self, f, args):
        payloads = JoinableQueue()
        procs = []

        def add_task(arg):
            payloads.put(arg)

        def process_task():
            while True:
                pl = payloads.get()
                f(pl, add_task)
                payloads.task_done()

        for arg in args:
            add_task(arg)

        procs = [Process(target=process_task) for _ in range(self.num_procs)]
        for p in procs:
            p.start()

        payloads.join()
        for p in procs:
            p.kill()

Чтобы проверить это, запустите -

from time import sleep
from random import random


def pause():
    sleep(random() / 100)


def process(payload, add_task):
    print(payload)
    pause()
    if payload:
        add_task(payload[:-1])
    return payload


if __name__ == '__main__':
    for x in range(1):
        PoolQueue(2).map(
            process,
            [
                'abcdefghij',
                '0123456789',
                '!@#$%^&*()',
            ],
        )

Одна из проблем заключается в том, что он заходит в тупик, очередь растет>32767 заданий. gevent.queue.JoinableQueue справляется с этим лучше, но это выходит за рамки этого вопроса.

0 голосов
/ 03 ноября 2019

По предложению @ martineau ваш код может быть обновлен до:

import multiprocessing as mp


def f(n, xs, xn):
    if n < 10:
        xn.append(n)
        xs.append(n + 1)
        xn.append(n)
        xs.append(n + 2)


if __name__ == '__main__':
    with mp.Manager() as manager:
        xs = manager.list()
        xn = manager.list()
        with mp.Pool(processes=2) as pool:
            pool.starmap(f, [(n, xs, xn) for n in range(20)])
        print(xn)
        print(xs)

Это печатает

[3, 0, 3, 0, 4, 1, 4, 1, 5, 2, 5, 2, 6, 6, 7, 9, 7, 9, 8, 8]
[4, 1, 5, 2, 5, 2, 6, 3, 6, 3, 7, 4, 7, 8, 8, 10, 9, 11, 9, 10]

, для которого вы видите, у вас нет гарантии порядка, в котором n значенийпроизводятся, сохраняются.

РЕДАКТИРОВАТЬ:

import multiprocessing as mp


def f(n):
    thresh = 10
    if max(xs) <= thresh and n < thresh:
        xs.append(n + 1)


if __name__ == '__main__':
    with mp.Manager() as manager:
        xs = manager.list([0])
        with mp.Pool(processes=2) as pool:
            pool.map(f, range(20))
        print(sorted(xs))

Этот печатает

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...