многопроцессорная обработка Python, каждая из которых имеет собственный подпроцесс (Kubuntu, Mac) - PullRequest
0 голосов
/ 07 января 2011

Я создал скрипт, который по умолчанию создает один многопроцессорный процесс; тогда работает нормально. При запуске нескольких процессов он начинает зависать и не всегда в одном и том же месте. В программе около 700 строк кода, поэтому я постараюсь подвести итог происходящему. Я хочу максимально использовать мои многоядерные процессоры, распараллеливая самую медленную задачу - выравнивание последовательностей ДНК. Для этого я использую модуль подпроцесса для вызова программы командной строки: «hmmsearch», которую я могу передавать в последовательностях через / dev / stdin, а затем зачитываю выровненные последовательности через / dev / stdout. Я полагаю, что зависание происходит из-за того, что эти несколько экземпляров подпроцесса читают / пишут из stdout / stdin, и я действительно не знаю, как лучше это сделать ... Я искал os.fdopen (...) & os.tmpfile (), чтобы создать временные файловые дескрипторы или каналы, через которые я могу очистить данные. Тем не менее, я никогда раньше не использовал и не могу представить, как это сделать с помощью модуля подпроцесса. В идеале я бы хотел полностью обойтись без использования жесткого диска, потому что каналы намного лучше с высокопроизводительной обработкой данных! Любая помощь с этим была бы супер замечательной !!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

Потратив некоторое время на отладку, я обнаружил проблему, которая всегда была там и еще не совсем решена, но исправила некоторые другие неэффективности в процессе (отладки). Существует две начальные функции фидера, это класс align_seq и анализатор файлов parseHMM () , который загружает в словарь матрицу оценки позиции (PSM). Затем основной родительский процесс сравнивает выравнивание с PSM, используя словарь (словарей) в качестве указателя на соответствующую оценку для каждого остатка. Чтобы рассчитать нужные мне оценки, у меня есть два отдельных класса multiprocessing.Process, один класс logScore () , который вычисляет соотношение шансов в журнале (с math.exp ()); Я распараллелил этот; и он ставит вычисленные оценки в очередь в последний процесс, sumScore () , который просто суммирует эти оценки (с math.fsum), возвращая сумму и все оценки по конкретным позициям обратно в родительский процесс в виде словаря. то есть Queue.put ([сумма, {остаток позиции: конкретная позиция позиции, ...}]) Я нахожу это чрезвычайно запутанным, чтобы обдумать (слишком много очередей!), Поэтому я надеюсь, что читатели смогут следить ... После того, как все вышеупомянутые вычисления сделаны, я тогда даю возможность сохранить кумулятивные результаты в виде табуляции. вывод с разделителями. Это где он сейчас (с прошлой ночи) иногда ломается, так как я гарантирую, что он печатает счет для каждой позиции, где должен быть счет. Я думаю, что из-за задержки (время синхронизации компьютера не синхронизировано), иногда то, что помещается в очередь сначала для logScore , не достигает sumScore в первую очередь. Для того чтобы sumScore знал, когда нужно возвращать счет и начинать заново, я помещаю 'endSEQ' в очередь для последнего процесса logScore, который выполнил вычисление. Я думал, что тогда он также должен достигнуть sumScore, но это не всегда так; только иногда это ломается. Так что теперь я больше не захожу в тупик, а вместо этого имею KeyError при печати или сохранении результатов. Я считаю, что причина получения KeyError иногда заключается в том, что я создаю очередь для каждого процесса logScore, но вместо этого все они должны использовать одну и ту же очередь. Теперь, где у меня есть что-то вроде: -

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

тогда как я должен создать только одну очередь для общего доступа ко всем экземплярам logScore. т.е.

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

Ответы [ 3 ]

2 голосов
/ 07 января 2011

Это не совсем то, как работает конвейерная обработка ... но, для простоты, вот выдержка из документации для подпроцесса :

stdin, stdout и stderr определяют стандартный ввод выполненных программ, стандартный вывод и стандартная ошибка файловые дескрипторы соответственно. действительный значения - ТРУБА, существующий файл дескриптор (положительное целое число), существующий файловый объект, и None. ТРУБЫ указывает, что новая труба для ребенка должен быть создан. Нет, нет произойдет перенаправление; ребенок файловые дескрипторы будут унаследованы от родитель.

Наиболее вероятные ошибочные области могут быть связаны с основным процессом или с вашим управлением семафором. Может быть, переходы / синхронизация состояний не выполняются должным образом из-за ошибки? Я предлагаю отладку, добавляя операторы logging / print до и после каждого вызова блокировки - где вы общаетесь с основным процессом и где вы получаете / освобождаете семафор, чтобы определить, где что-то пошло не так.

Также мне любопытно - нужен ли семафор?

1 голос
/ 07 января 2011

Я также хотел распараллелить простые задачи и для этого я создал небольшой скрипт на python.Вы можете взглянуть на: http://bioinf.comav.upv.es/psubprocess/index.html

Это немного более общее, чем вы хотите, но для простых задач довольно легко использоватьЭто может быть по крайней мере какой-то изоляции для вас.

Хосе Бланка

0 голосов
/ 07 января 2011

Это может быть тупик в подпроцессе, вы пытались использовать метод связи, а не ждать?http://docs.python.org/library/subprocess.html

...