Как применить многопоточность для кода перевода Bio :: SeqIO (Bioperl)? - PullRequest
0 голосов
/ 12 ноября 2018

Я перевожу фаста-нуклеотидный файл в белковые последовательности этим кодом

use Bio::SeqIO;
use Getopt::Long;

my ($format,$outfile) = 'fasta';

GetOptions(
    'f|format:s'  => \$format,
    'o|out|outfile:s' => \$outfile,
    );

my $oformat = 'fasta';
$file=$ARGV[0];
chomp $file;

# this implicity uses the <> file stream
my $seqin = Bio::SeqIO->new( -format => $format, -fh => \*ARGV);
my $seqout;
if( $outfile ) {
    $seqout = Bio::SeqIO->new( -format => $oformat, -file => ">$outfile" );
} else {
# defaults to writing to STDOUT
    $seqout = Bio::SeqIO->new( -format => $oformat );
}

    while( (my $seq = $seqin->next_seq()) ) {
            my $pseq = $seq->translate();
            $seqout->write_seq($pseq);
    }

Я реализую threads и threads :: shared модулей perl для достижения в других случаях, но я хочу применить следующий код в предыдущей задаче

use threads;
use threads::shared;
use List::Util qw( sum );
use YAML;
use constant NUM_THREADS =>100;

my @output :shared;

my $chunk_size = @data / NUM_THREADS;

my @threads;
for my $chunk ( 1 .. NUM_THREADS ) {
    my $start = ($chunk - 1) * $chunk_size;
    push @threads, threads->create(
        \&doOperation,
        \@data,
        $start,
        ($start + $chunk_size - 1),
        \@output,
    );
}
$_->join for @threads;

sub doOperation{
    my ($data, $start, $end, $output) = @_;

    my $id = threads->tid;

    print "$id ";

    for my $i ($start .. $end) {
        print "Thread [$id] processing row $i\n";

# ЭТО ДОЛЖНО БЫТЬ МНОГОПРОЧНЫМ

    while( (my $seq = $seqin->next_seq()) ) {
            my $pseq = $seq->translate();
            $seqout->write_seq($pseq);
    }

# ЭТО ДОЛЖНО БЫТЬ МНОГОПРОЧНЫМ

        sleep 1 if 0.2 > rand;
    }
    print "Thread done.\n";
    return;
}
print "\n$time\n";
my $time = localtime;
print "$time\n";

Потоки создаются, но каким-то образом он не может обработать файл fasta. Первый код прекрасно работает без многопоточности.

1 Ответ

0 голосов
/ 12 ноября 2018

Боюсь, я не собираюсь переписывать ваш код для вас, но я могу дать вам несколько советов о том, как выполнить многопоточность.

Что нужно понять о потоках perl, так это , а не легкая нить. Вы должны создать количество потоков, равное параллелизму, запустить их с Thread::Queue и идти оттуда.

Вам также нужно избегать любых не поточно-безопасных модулей - вы можете использовать их, если вы осторожны , но это обычно означает создание их экземпляров в потоке с помощью require и import вместо use в начале программы.

Я бы также предложил избегать попыток выполнять ваш выводной ввод-вывод параллельно - возвращать результаты потока и объединять их (сортировать, если необходимо) в «главном» потоке (или выделять отдельную программу записи).

Так что я бы пошел с чем-то вроде;

#!/usr/bin/env perl

use strict;
use warnings;

use threads;

use Thread::Queue;
use Storable qw ( freeze thaw );

my $NUM_THREADS = 16;    #approx number of cores.

my $translate_q         = Thread::Queue->new;
my $translate_results_q = Thread::Queue->new;


sub translate_thread {
   while ( my $item = translate_q->dequeue ) {
      my $seq  = thaw $item;
      my $pseq = $seq->translate();
      $translate_results_q->enqueue( freeze $pseq );

   }

}

threads->create( \&translate_thread ) for 1 .. $NUM_THREADS;

while ( my $seq => $seqin->next_seq ) {
   $translate_q->enqueue( freeze($seq) );
}
$translate_q->end;

$_->join for threads->list;
$translate_results_q->end;

while ( my $result = $translate_results_q->dequeue ) {
   my $pseg = thaw($result);
}

Примечание - это не будет работать как есть, потому что отсутствует слияние с остальной частью вашего ocde. Но, надеюсь, это иллюстрирует, как очереди и потоки могут работать, чтобы получить параллелизм?

Вы передаете свои объекты, используя freeze и thaw из Storable, и используете распараллеливание для их распаковки.

Не сходите с ума по количеству потоков - для преимущественно вычислительных рабочих нагрузок (например, без ввода-вывода) тогда число потоков, равное количеству ядер, является правильным. Если они будут блокировать ввод-вывод, вы можете увеличить это число, но обход двойного не очень поможет.

Вы не можете эффективно распараллелить дисковый ввод-вывод - просто так не работает. Так что сделайте это в «основной» теме.

...