Могу ли я использовать многопроцессорную очередь в функции, вызываемой Pool.imap? - PullRequest
24 голосов
/ 30 сентября 2010

Я использую Python 2.7 и пытаюсь запустить некоторые тяжелые задачи процессора в своих собственных процессах.Я хотел бы иметь возможность отправлять сообщения обратно в родительский процесс, чтобы держать его в курсе текущего состояния процесса.Многопроцессорная очередь кажется идеальной для этого, но я не могу понять, как заставить ее работать.

Итак, это мой основной рабочий пример, за исключением использования очереди.

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Я попытался передать очередь несколькими способами, и они получают сообщение об ошибке «RuntimeError: объекты очереди должны быть общими только для процессов через наследование».Вот один из способов, которые я попробовал, основываясь на ранее найденном ответе.(У меня возникает та же проблема, когда я пытаюсь использовать Pool.map_async и Pool.imap)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Наконец, фитнес-подход 0 (делает его глобальным) не создает никаких сообщений, он просто блокируется.

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Я знаю, что он, вероятно, будет работать с многопроцессорными процессами напрямую, и что для этого есть другие библиотеки, но я не хочу отступать от стандартных библиотечных функций, которые прекрасно подходят доЯ уверен, что не только отсутствие знаний не позволяет мне использовать их.

Спасибо.

1 Ответ

45 голосов
/ 02 октября 2010

Хитрость заключается в передаче очереди в качестве аргумента инициализатору. Появляется для работы со всеми методами отправки пула.

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()
...