Сжатие вывода с помощью Python и многопроцессорности - PullRequest
2 голосов
/ 25 октября 2011

так что я уже некоторое время пишу числовые вещи с использованием numpy и многопроцессорной обработки.Это работает нормально, но у меня возникают проблемы со сбором результатов.Я сделал это следующим образом, я беру очередь для ввода и одну для вывода.Программа считывает параметры из входной очереди, обрабатывает их и затем помещает результат в выходную очередь.Позже в основном процессе я зачитал его из очереди и засолил.Примерно так:

def fun(inp,outp):
    while True:
        try:
            params = inp.get(block=False)
            results = runprocess(params)
            out.put(results,block=False)
        except Empty:
            break

позже в основном цикле я делаю следующее:

for p in processes:
    p.start()
for p in processes:
    p.join()

while True:
     try:
          out = outp.get(block=False)
          a[i] = [out]
     except Empty:
          break

 fi = open(filename,"w")
 cPickle.dump(fi,a)
 fi.close()

Но почему-то всегда происходит одна из двух вещей: либо рассол выходит пустым,или процессы зависают и продолжают работать, используя 0% ЦП (в начале они увеличиваются до 100%, это в основном сокращение числа).Есть мысли о том, что я делаю не так?

Хорошо, поэтому я переделал это с помощью Pool.map ().Просто чтобы все знали, как я заставил его работать, вот фрагмент:

    ncpus = mp.cpu_count()
    out = dict()

    params = [(a,p) for p in np.arange(0.0,2.0,0.1) for a in np.arange(0.001,2.0,0.1)]

    pool = mp.Pool(processes=ncpus)
    results = pool.map(runm,params)

    for i in results:
            sigs = np.zeros((order,order))
            sigsmf = np.zeros((order,order))
            sigseq = np.zeros((order,order))
            xs = np.array([])
            freqs = np.array([])
            [(a,p),sigs[:,:],sigsmf[:,:],sigseq[:,:],xs,freqs] = i
            out[(a,p)] = [sigs[:,:],sigsmf[:,:],sigseq[:,:],xs,freqs]
            print a, p, sigs[0,0]

Работает как шарм, намного проще в реализации!

Спасибо, Фердинанд!Я не уверен, как, но я думаю, что мы можем закрыть вопрос сейчас!

Ответы [ 2 ]

2 голосов
/ 25 октября 2011

Вам необходимо ввести timeout как минимум в get вызовы и удалить block.В вашей текущей конфигурации, если при вызове get ни один элемент недоступен, вы получите исключение Empty, выходящее из цикла.Если вы зависите от другого потока для заполнения этой очереди, и он не заполняется вовремя, он преждевременно выйдет из цикла и даст пустой результат.Аналогично, put может зависнуть, потому что очередь переполнена, что приводит к зависанию вашей программы.

Итак, используйте что-то вроде этого:

params = inp.get(timeout=1)
out.put(timeout=1)
2 голосов
/ 25 октября 2011

Это потому, что у вас есть блок = False.Когда сборщик пытается получить данные из очереди, он не находит их там сразу.Поэтому возникает пустое исключение, и оно выходит из цикла

. При получении данных из списка ввода вы можете указать block = False в качестве предварительно заполненного списка, я полагаю.Однако очередь вывода строится во время выполнения.Поэтому, когда вы пытаетесь получить данные из него, возможно, что он пустой, поскольку процесс ввода занимает больше времени.

Если вы знаете длину входной очереди, то вы можете попытаться заблокировать выводОчередь кет до бесконечности.Если нет, то я бы посоветовал вам заблокировать время ожидания.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...