Perl Queues и Threading - PullRequest
       7

Perl Queues и Threading

7 голосов
/ 13 февраля 2012

Я пытаюсь выполнить следующее:

  1. Есть поток, который читает данные из очень большого файла, скажем, около 10 ГБ, и помещает их в очередь.(Я не хочу, чтобы очередь тоже становилась очень большой)

  2. Пока поток buildQueue отправляет данные в очередь, одновременно из 5 рабочих потоков удаляется очередьи обработать данные.

Я предпринял попытку, но другие мои потоки недоступны из-за непрерывного цикла в моем потоке buildQueue.

Мой подход может бытьсовершенно неправильно.Спасибо за любую помощь, это очень ценится.

Вот код для buildQueue:

sub buildQueue {
    print "Enter a file name: ";
    my $dict_path = <STDIN>;
    chomp($dict_path);
    open DICT_FILE, $dict_path or die("Sorry, could not open file!");
    while (1) {
        if (<DICT_FILE>) {
            if ($queue->pending() < 100) {
                 my $query = <DICT_FILE>;
                 chomp($query);
                 $queue->enqueue($query);
                 my $count = $queue->pending();
                 print "Queue Size: $count Query: $query\n";
            }
        }
    }
}

И, как я и ожидал, когда этот поток исполняется, ничего больше после не будет выполнено, потому чтоэтот поток не закончится.

my $builder = new Thread(&buildQueue);

Поскольку поток компоновщика будет работать долгое время, я никогда не смогу создать рабочие потоки.

Вот весь код:

#!/usr/bin/perl -w
use strict;
use Thread;
use Thread::Queue;


my $queue = new Thread::Queue();
my @threads;

sub buildQueue {
    print "Enter a file name: ";
    my $dict_path = <STDIN>;
    chomp($dict_path);
    open dict_file, $dict_path or die("Sorry, could not open file!");
    while (1) {
        if (<dict_file>) {
            if ($queue->pending() < 100) {
                 my $query = <dict_file>;
                 chomp($query);
                 $queue->enqueue($query);
                 my $count = $queue->pending();
                 print "Queue Size: $count Query: $query\n";
            }
        }
    }
}

sub processor {
    my $query;
    while (1) {
        if ($query = $queue->dequeue) {
            print "$query\n";
        }
    }
}

my $builder = new Thread(&buildQueue);
push @threads, new Thread(&processor) for 1..5;

Ответы [ 4 ]

10 голосов
/ 13 февраля 2012

Вам нужно будет пометить, когда вы хотите, чтобы ваши темы выходили (через join или detach).Тот факт, что у вас есть бесконечные циклы без операторов last, из которых можно выйти, также является проблемой.

Редактировать: Я также забыл очень важную часть! Каждый рабочий поток будет блокироваться, ожидая обработки другого элемента из очереди, пока не получит undef в очереди .Поэтому мы специально ставим undef в очередь для каждого потока после завершения построения очереди.

Попробуйте:

#!/usr/bin/perl -w
use strict;
use threads;
use Thread::Queue;


my $queue = new Thread::Queue();
our @threads; #Do you really need our instead of my?

sub buildQueue
{
    print "Enter a file name: ";
    my $dict_path = <STDIN>;
    chomp($dict_path);

    #Three-argument open, please!
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!");
    while(my $query=<$dict_file>)
    {
        chomp($query);
        while(1)
        {   #Wait to see if our queue has < 100 items...
            if ($queue->pending() < 100) 
            {
                $queue->enqueue($query);
                print "Queue Size: " . $queue->pending . "\n";
                last; #This breaks out of the infinite loop
            }
        }
    }
    close($dict_file);
    foreach(1..5)
    {
        $queue->enqueue(undef);
    }
}

sub processor 
{
    my $query;
    while ($query = $queue->dequeue) 
    {
        print "Thread " . threads->tid . " got $query\n";
    }
}

my $builder=threads->create(\&buildQueue);
push @threads,threads->create(\&process) for 1..5;

#Waiting for our threads to finish.
$builder->join;
foreach(@threads)
{
    $_->join;
}
3 голосов
/ 30 октября 2013

Модуль MCE для Perl любит большие файлы.С помощью MCE можно разбивать на несколько строк одновременно, отбрасывать большой кусок в виде скалярной строки или читать по 1 строке за раз.Объединение нескольких строк в линию уменьшает накладные расходы для IPC.

MCE 1.504 сейчас отсутствует.Он предоставляет MCE :: Queue поддержку дочерних процессов, включая потоки.Кроме того, версия 1.5 поставляется с 5 моделями (MCE :: Flow, MCE :: Grep, MCE :: Loop, MCE :: Map и MCE :: Stream), которые занимаются созданием экземпляра MCE, а также автоматическимнастройка max_workers и chunk_size.Можно переопределить эти параметры между прочим.

Ниже, MCE :: Loop используется для демонстрации.

use MCE::Loop;

print "Enter a file name: ";
my $dict_path = <STDIN>;
chomp($dict_path);

mce_loop_f {
   my ($mce, $chunk_ref, $chunk_id) = @_;

   foreach my $line ( @$chunk_ref ) {
      chomp $line;
      ## add your code here to process $line
   }

} $dict_path;

Если вы хотите указать количество рабочих и / или chunk_size, то естьЕсть два способа сделать это.

use MCE::Loop max_workers => 5, chunk_size => 300000;

Или ...

use MCE::Loop;

MCE::Loop::init {
   max_workers => 5,
   chunk_size  => 300000
};

Хотя для больших файлов предпочтительнее разбиение на фрагменты, можно сравнить время с разбиением на одну строку за раз.Можно пропустить первую строку внутри блока (закомментировано).Обратите внимание, что нет необходимости во внутреннем цикле for.$ chunk_ref по-прежнему является ссылкой на массив, содержащей 1 строку.Входной скаляр $ _ содержит строку, когда chunk_size равен 1, в противном случае указывает на $ chunk_ref.

use MCE::Loop;

MCE::Loop::init {
   max_workers => 5,
   chunk_size  => 1
};

print "Enter a file name: ";
my $dict_path = <STDIN>;
chomp($dict_path);

mce_loop_f {
 # my ($mce, $chunk_ref, $chunk_id) = @_;

   my $line = $_;
   ## add your code here to process $line or $_

} $dict_path;

Я надеюсь, что эта демонстрация была полезна для людей, желающих обрабатывать файл параллельно.

:) Марио

1 голос
/ 13 февраля 2012

Похоже, что этот случай можно сделать с модулем Parallel::ForkManager.

0 голосов
/ 16 января 2013

Другой подход: Вы также можете использовать user_tasks в MCE 1.2 + и создать две нескольких рабочих задач одна задача для чтения (поскольку это большой файл, вы также можете извлечь выгоду из параллельного чтения при сохранении поиска чтения файла) и одна задача для обработки и т. д.

В приведенном ниже коде все еще используется Thread ::Очередь для управления вашей буферной очередью.

В подпрограмме buildQueue есть элемент управления размером очереди, и он передает данные непосредственно в процесс менеджера '$ R_QUEUE, поскольку мы использовали потоки, поэтому он имеет доступв пространство памяти родителя.Если вы хотите вместо этого использовать вилки, вы все равно можете получить доступ к очереди через функцию обратного вызова.Но здесь я решил просто нажать на очередь.

Подсистема processQueue просто удалит из очереди все, что находится в очереди, до тех пор, пока не останется ничего ожидающего.

Подсистема task_endв каждой задаче процесс диспетчера запускается только один раз в конце каждой задачи, поэтому мы используем его, чтобы сигнализировать об остановке нашим рабочим процессам.

Очевидно, что существует большая свобода в том, как вы хотите разделить на частиваши данные работникам, чтобы вы могли принять решение о размере чанка или даже о том, как вбить ваши данные.

#!/usr/bin/env perl
use strict;
use warnings;
use threads;
use threads::shared;
use Thread::Queue;
use MCE;

my $R_QUEUE = Thread::Queue->new;
my $queue_workers = 8;
my $process_workers = 8;
my $chunk_size = 1;

print "Enter a file name: ";
my $input_file = <STDIN>;
chomp($input_file);

sub buildQueue {
    my ($self, $chunk_ref, $chunk_id) = @_;
    if ($R_QUEUE->pending() < 100) {
        $R_QUEUE->enqueue($chunk_ref);
        $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n");
    }
}

sub processQueue {
    my $self = shift;
    my $wid = $self->wid;
    while (my $buff = $R_QUEUE->dequeue) {
        $self->sendto('stdout', "Thread " . $wid . " got $$buff");
    }
}

my $mce = MCE->new(
    input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details.
    chunk_size => $chunk_size,
    use_slurpio => 1,

    user_tasks => [
        { # queueing task
            max_workers => $queue_workers,
            user_func => \&buildQueue,
            use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory.
            task_end => sub { $R_QUEUE->enqueue( (undef) x $process_workers ) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney!
        },
        { # process task
            max_workers => $process_workers,
            user_func => \&processQueue,
            use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory
            task_end => sub { print "Finished processing!\n"; }
        }
    ]
);

$mce->run();

exit;
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...