Я создал скрипт, который по умолчанию создает один многопроцессорный процесс; тогда работает нормально. При запуске нескольких процессов он начинает зависать и не всегда в одном и том же месте. В программе около 700 строк кода, поэтому я постараюсь подвести итог происходящему. Я хочу максимально использовать мои многоядерные процессоры, распараллеливая самую медленную задачу - выравнивание последовательностей ДНК. Для этого я использую модуль подпроцесса для вызова программы командной строки: «hmmsearch», которую я могу передавать в последовательностях через / dev / stdin, а затем зачитываю выровненные последовательности через / dev / stdout. Я полагаю, что зависание происходит из-за того, что эти несколько экземпляров подпроцесса читают / пишут из stdout / stdin, и я действительно не знаю, как лучше это сделать ...
Я искал os.fdopen (...) & os.tmpfile (), чтобы создать временные файловые дескрипторы или каналы, через которые я могу очистить данные. Тем не менее, я никогда раньше не использовал и не могу представить, как это сделать с помощью модуля подпроцесса. В идеале я бы хотел полностью обойтись без использования жесткого диска, потому что каналы намного лучше с высокопроизводительной обработкой данных!
Любая помощь с этим была бы супер замечательной !!
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq( multiprocessing.Process ):
def __init__( self, inPipe, outPipe, semaphore, options ):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
self.sem.acquire()
align_process.stdin.write( seq_record.format('fasta') )
align_process.stdin.close()
for seq in SeqIO.parse( align_process.stdout, 'stockholm' ): # get the alignment output
self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
Потратив некоторое время на отладку, я обнаружил проблему, которая всегда была там и еще не совсем решена, но исправила некоторые другие неэффективности в процессе (отладки).
Существует две начальные функции фидера, это класс align_seq и анализатор файлов parseHMM () , который загружает в словарь матрицу оценки позиции (PSM).
Затем основной родительский процесс сравнивает выравнивание с PSM, используя словарь (словарей) в качестве указателя на соответствующую оценку для каждого остатка. Чтобы рассчитать нужные мне оценки, у меня есть два отдельных класса multiprocessing.Process, один класс logScore () , который вычисляет соотношение шансов в журнале (с math.exp ()); Я распараллелил этот; и он ставит вычисленные оценки в очередь в последний процесс, sumScore () , который просто суммирует эти оценки (с math.fsum), возвращая сумму и все оценки по конкретным позициям обратно в родительский процесс в виде словаря.
то есть
Queue.put ([сумма, {остаток позиции: конкретная позиция позиции, ...}])
Я нахожу это чрезвычайно запутанным, чтобы обдумать (слишком много очередей!), Поэтому я надеюсь, что читатели смогут следить ... После того, как все вышеупомянутые вычисления сделаны, я тогда даю возможность сохранить кумулятивные результаты в виде табуляции. вывод с разделителями. Это где он сейчас (с прошлой ночи) иногда ломается, так как я гарантирую, что он печатает счет для каждой позиции, где должен быть счет. Я думаю, что из-за задержки (время синхронизации компьютера не синхронизировано), иногда то, что помещается в очередь сначала для logScore , не достигает sumScore в первую очередь.
Для того чтобы sumScore знал, когда нужно возвращать счет и начинать заново, я помещаю 'endSEQ' в очередь для последнего процесса logScore, который выполнил вычисление. Я думал, что тогда он также должен достигнуть sumScore, но это не всегда так; только иногда это ломается. Так что теперь я больше не захожу в тупик, а вместо этого имею KeyError при печати или сохранении результатов.
Я считаю, что причина получения KeyError иногда заключается в том, что я создаю очередь для каждого процесса logScore, но вместо этого все они должны использовать одну и ту же очередь. Теперь, где у меня есть что-то вроде: -
class logScore( multiprocessing.Process ):
def __init__( self, inQ, outQ ):
self.inQ = inQ
...
def scoreSequence( processes, HMMPSM, sequenceInPipe ):
process_index = -1
sequence = sequenceInPipe.recv_bytes()
for residue in sequence:
.... ## Get the residue score.
process_index += 1
processes[process_index].inQ.put( residue_score )
## End of sequence
processes[process_index].inQ.put( 'endSEQ' )
logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
тогда как я должен создать только одну очередь для общего доступа ко всем экземплярам logScore. т.е.
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )