Многопроцессорная обработка Python: синхронизация файлового объекта - PullRequest
7 голосов
/ 28 апреля 2011

Я пытаюсь создать файл, подобный объекту, который должен быть назначен sys.stdout / sys.stderr во время тестирования, чтобы обеспечить детерминированный вывод. Это не значит быть быстрым, просто надежным. То, что у меня есть до сих пор почти , работает, но мне нужна помощь, чтобы избавиться от последних нескольких крайних ошибок.

Вот моя текущая реализация.

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO

from os import getpid
class MultiProcessFile(object):
    """
    helper for testing multiprocessing

    multiprocessing poses a problem for doctests, since the strategy
    of replacing sys.stdout/stderr with file-like objects then
    inspecting the results won't work: the child processes will
    write to the objects, but the data will not be reflected
    in the parent doctest-ing process.

    The solution is to create file-like objects which will interact with
    multiprocessing in a more desirable way.

    All processes can write to this object, but only the creator can read.
    This allows the testing system to see a unified picture of I/O.
    """
    def __init__(self):
        # per advice at:
        #    http://docs.python.org/library/multiprocessing.html#all-platforms
        from multiprocessing import Queue
        self.__master = getpid()
        self.__queue = Queue()
        self.__buffer = StringIO()
        self.softspace = 0

    def buffer(self):
        if getpid() != self.__master:
            return

        from Queue import Empty
        from collections import defaultdict
        cache = defaultdict(str)
        while True:
            try:
                pid, data = self.__queue.get_nowait()
            except Empty:
                break
            cache[pid] += data
        for pid in sorted(cache):
            self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) )
    def write(self, data):
        self.__queue.put((getpid(), data))
    def __iter__(self):
        "getattr doesn't work for iter()"
        self.buffer()
        return self.__buffer
    def getvalue(self):
        self.buffer()
        return self.__buffer.getvalue()
    def flush(self):
        "meaningless"
        pass

... и скрипт быстрого теста:

#!/usr/bin/python2.6

from multiprocessing import Process
from mpfile import MultiProcessFile

def printer(msg):
    print msg

processes = []
for i in range(20):
    processes.append( Process(target=printer, args=(i,), name='printer') )

print 'START'
import sys
buffer = MultiProcessFile()
sys.stdout = buffer

for p in processes:
    p.start()
for p in processes:
    p.join()

for i in range(20):
    print i,
print

sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
print 
print 'DONE'
print
buffer.buffer()
print buffer.getvalue()

Это отлично работает в 95% случаев, но у него есть три проблемы с крайними случаями. Я должен запустить тестовый скрипт в быстром цикле while, чтобы воспроизвести их.

  1. 3% времени, вывод родительского процесса не полностью отражается. Я предполагаю, что это потому, что данные используются до того, как поток очистки очереди сможет их догнать. У меня нет способа ждать потока без блокировки.
  2. .5% времени, есть обратная связь от многопроцессной реализации. Queue
  3. .01% времени, PID оборачиваются, поэтому сортировка по PID дает неправильный порядок.

В самом худшем случае (шансы: один на 70 миллионов) результат будет выглядеть следующим образом:

START

DONE

302 wrote: '19\n'
32731 wrote: '0 1 2 3 4 5 6 7 8 '
32732 wrote: '0\n'
32734 wrote: '1\n'
32735 wrote: '2\n'
32736 wrote: '3\n'
32737 wrote: '4\n'
32738 wrote: '5\n'
32743 wrote: '6\n'
32744 wrote: '7\n'
32745 wrote: '8\n'
32749 wrote: '9\n'
32751 wrote: '10\n'
32752 wrote: '11\n'
32753 wrote: '12\n'
32754 wrote: '13\n'
32756 wrote: '14\n'
32757 wrote: '15\n'
32759 wrote: '16\n'
32760 wrote: '17\n'
32761 wrote: '18\n'

Exception in thread QueueFeederThread (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
  File "/usr/lib/python2.6/threading.py", line 484, in run
      File "/usr/lib/python2.6/multiprocessing/queues.py", line 233, in _feed
<type 'exceptions.TypeError'>: 'NoneType' object is not callable

В python2.7 исключение немного отличается:

Exception in thread QueueFeederThread (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
  File "/usr/lib/python2.7/threading.py", line 505, in run
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
<type 'exceptions.IOError'>: [Errno 32] Broken pipe

Как мне избавиться от этих крайних случаев?

Ответы [ 2 ]

9 голосов
/ 29 апреля 2011

Решение состояло из двух частей. Я успешно выполнил тестовую программу 200 тысяч раз без каких-либо изменений в выводе.

Легко было использовать multiprocessing.current_process () ._ identity для сортировки сообщений. Это не часть опубликованного API, но это уникальный, детерминированный идентификатор каждого процесса. Это исправило проблему с обтеканием PID и неправильным порядком вывода.

Другая часть решения заключалась в использовании multiprocessing.Manager (). Queue (), а не multiprocessing.Queue. Это решает проблему № 2 выше, потому что менеджер живет в отдельном Процессе, и, таким образом, избегает некоторых плохих особых случаев при использовании Очереди из процесса владения. № 3 исправлен, поскольку очередь полностью исчерпана, и поток фидера естественным образом умирает, прежде чем python начинает закрываться и закрывает стандартный ввод.

0 голосов
/ 17 июля 2012

Я обнаружил гораздо меньше ошибок multiprocessing в Python 2.7, чем в Python 2.6. Сказав это, решение, которое я использовал, чтобы избежать проблемы "Exception in thread QueueFeederThread", состоит в том, чтобы на мгновение sleep, возможно, на 0,01 с, в каждом процессе, в котором используется Queue. Это правда, что использование sleep нежелательно или даже ненадежно, но было установлено, что указанная продолжительность работала достаточно хорошо на практике для меня. Вы также можете попробовать 0,1 с.

...