Умный способ поделиться данными файла между потоками в Perl - PullRequest
2 голосов
/ 27 октября 2011

Я ищу надежный (и, возможно, умный) способ обмена данными файлов между переменным числом потоков в Perl. Я надеюсь на установку, в которой основной поток читает строки из файла, в то время как другие рабочие потоки обрабатывают отдельные записи.

До сих пор я пытался использовать Thread :: Queue, и мне не очень повезло с ними. Когда я достигаю конца файла, большинство потоков застряли в заблокированном состоянии, ожидая чтения данных из него, как только основной поток завершит чтение. Таким образом, потоки несколько застряли в подвешенном состоянии, и join () не может вернуть их обратно.

При использовании неблокирующего доступа к очередям потоки имеют тенденцию застревать в узком цикле «попытка получить данные, нет их неопределенных, попытаться получить данные ...», что приводит к загромождению ЦП и не работает Каждый поток обычно получает хотя бы несколько итераций этого узкого цикла, даже если есть только один рабочий поток. Бросок в sleep () не слишком помог, потому что он принимает только целочисленные значения (sleep (0) бесполезен, а sleep (1) слишком медленный).

Оптимально, я бы хотел, чтобы можно было делиться потоком входного файла, чтобы каждый поток блокировал его, читал строку из него, а затем разблокировал, но совместное использование глобусов запрещено / не поддерживается. Сначала я загрузил бы весь файл в память, но, учитывая, что в нем содержится 40 миллионов записей (в качестве оценки нижнего уровня), это неосуществимо.

Так что это то место, где вы можете прийти. Мне нужен удобный способ реализовать настройку считывателя / процессора между основным потоком и рабочими потоками, которая не тратит излишнее потребление ресурсов процессора и оставляет потоки в соединении ( ) -able состояние, как только читатель достигает конца файла.

Большое спасибо за любую помощь или идеи!

Ответы [ 2 ]

4 голосов
/ 27 октября 2011

Этот маленький тест сработал для меня. (Я никогда раньше не использовал нити, но в прошлом делал то же самое с вилкой и трубами). Поэтому, в основном, нужно сказать вашим нитям закончить, прежде чем просить их присоединиться, для этого я вставляю undef в очередь.

#!/usr/bin/env perl

use strict;
use warnings;

use threads;
use Thread::Queue;

use constant MAX_THREADS => 5;

sub process_data
{
    my( $q ) = @_;
    while( defined( my $data = $q->dequeue() ) )
    {
        print "Thread[".threads->tid()."]: Processing data($data)\n";
    }

    print "Thread[".threads->tid()."]: Got end message\n";
} # END process_data

# Main program
{
    my @threads;
    my $q = Thread::Queue->new();
    foreach ( 1 .. MAX_THREAD )
    {
        push( @threads, async { process_data($q) } );
    }

    while( my $line = <STDIN> )
    {
        chop( $line );
        $q->enqueue( $line );
    }

    foreach my $thread ( @threads )
    {
        $q->enqueue( undef );
    }

    foreach my $thread ( @threads )
    {
        $thread->join();
    }
}
2 голосов
/ 27 октября 2011

Чтение с основным потоком, а затем использование сопрограмм для обработки строк:

use strict;
use warnings;
use Coro;

my $sem = Coro::Semaphore->new(10); # maximum of ten semaphores
while my $line ( <$FILE> ) {
    $sem->down;
    async {
        dostuff($line);
        $sem->up;
    };
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...