Приложение потоков неожиданно завершает работу - PullRequest
0 голосов
/ 09 сентября 2010

У меня мало скребущего приложения, и я пытаюсь добавить к нему многопоточность. Вот код (MyMech - это подкласс WWW :: Mechanize, используемый для обработки ошибок HTTP):

#!/usr/bin/perl

use strict;
use MyMech;
use File::Basename;
use File::Path;
use HTML::Entities;
use threads;
use threads::shared;
use Thread::Queue;
use List::Util qw( max sum );

my $page   = 1;
my %CONFIG = read_config();

my $mech = MyMech->new( autocheck => 1 );
$mech->quiet(0);

$mech->get( $CONFIG{BASE_URL} . "/site-map.php" );

my @championship_links =
  $mech->find_all_links( url_regex => qr/\d{4}-\d{4}\/$/ );

foreach my $championship_link (@championship_links) {

    my @threads;

    my $queue           = Thread::Queue->new;
    my $queue_processed = Thread::Queue->new;

    my $url = sprintf $championship_link->url_abs();

    print $url, "\n";

    next unless $url =~ m{soccer}i;

    $mech->get($url);

    my ( $last_round_loaded, $current_round ) =
      find_current_round( $mech->content() );

    unless ($last_round_loaded) {

        print "\tLoading rounds data...\n";

        $mech->submit_form(

            form_id => "leagueForm",
            fields  => {

                round => $current_round,
            },
        );
    }

    my @match_links =
      $mech->find_all_links( url_regex => qr/matchdetails\.php\?matchid=\d+$/ );

    foreach my $link (@match_links) {

        $queue->enqueue($link);
    }

    print "Starting printing thread...\n";

    my $printing_thread = threads->create(
        sub { printing_thread( scalar(@match_links), $queue_processed ) } )
      ->detach;

    push @threads, $printing_thread;

    print "Starting threads...\n";

    foreach my $thread_id ( 1 .. $CONFIG{NUMBER_OF_THREADS} ) {

        my $thread = threads->create(
            sub { scrape_match( $thread_id, $queue, $queue_processed ) } )
          ->join;
        push @threads, $thread;
    }

    undef $queue;
    undef $queue_processed;

    foreach my $thread ( threads->list() ) {

        if ( $thread->is_running() ) {

            print $thread->tid(), "\n";
        }
    }

    #sleep 5;
}

print "Finished!\n";

sub printing_thread {

    my ( $number_of_matches, $queue_processed ) = @_;

    my @fields =
      qw (
          championship
          year
          receiving_team
          visiting_team
          score
          average_home
          average_draw
          average_away
          max_home
          max_draw
          max_away
          date
          url
         );

    while ($number_of_matches) {

        if ( my $match = $queue_processed->dequeue_nb ) {

            open my $fh, ">>:encoding(UTF-8)", $CONFIG{RESULT_FILE} or die $!;

            print $fh join( "\t", @{$match}{@fields} ), "\n";
            close $fh;

            $number_of_matches--;
        }
    }

    threads->exit();
}

sub scrape_match {

    my ( $thread_id, $queue, $queue_processed ) = @_;

    while ( my $match_link = $queue->dequeue_nb ) {

        my $url = sprintf $match_link->url_abs();

        print "\t$url", "\n";

        my $mech = MyMech->new( autocheck => 1 );
        $mech->quiet(0);

        $mech->get($url);

        my $match = parse_match( $mech->content() );
        $match->{url} = $url;

        $queue_processed->enqueue($match);
    }

    return 1;
}

И у меня есть некоторые странные вещи с этим кодом. Иногда он запускается, но иногда завершается без ошибок (в точке ->detach). Я знаю, что @match_links содержит данные, но потоки не создаются, и они просто закрываются. Обычно он завершается после обработки второй записи $championship_link.

Может быть, я делаю что-то не так?

Обновление Вот код для find_current_round подпрограммы (но я уверен, что это не связано с вопросом):

sub find_current_round {

    my ($html) = @_;

    my ($select_html) = $html =~ m{

    <select\s+name="round"[^>]+>\s*
    (.+?)
    </select>
    }isx;

    my ( $option_html, $current_round ) = $select_html =~ m{

    (<option\s+value="\d+"(?:\s+ selected="selected")?>(\d+)</option>)\Z
    }isx;

    my ($last_round_loaded) = $option_html =~ m{selected};

    return ( $last_round_loaded, $current_round );
}

1 Ответ

0 голосов
/ 15 февраля 2015

Прежде всего - не используйте dequeue_nb (). Это плохая идея, потому что если очередь временно пуста, она вернет undef и ваш поток завершится.

Используйте вместо dequeue и end. dequeue заблокирует, но как только вы end свою очередь, время выйдет.

Вы также делаете некоторые странные вещи со своими потоками - я бы посоветовал вам редко хотеть detach обсуждений. Вы просто предполагаете, что ваш поток будет завершен до вашей программы, что не является хорошим планом.

Аналогично этому;

    my $thread = threads->create(
        sub { scrape_match( $thread_id, $queue, $queue_processed ) } )
      ->join;

Вы порождаете нить, а затем мгновенно присоединяетесь к ней. И так что join call будет ... блокировать ожидание выхода вашего потока. Вам не нужны потоки, чтобы сделать это ...

Вы также ограничиваете свои очереди в цикле foreach. Я не думаю, что это хороший план. Вместо этого я бы предложил - охватывать их внешне и создавать определенное количество «рабочих» потоков (и один «печатный» поток).

А потом просто кормите их через механизм очереди. В противном случае вы в конечном итоге создадите несколько экземпляров очереди, поскольку они имеют лексическую область видимости.

И как только вы закончите работу с очередями, введите $queue -> end, который завершит цикл while.

Вам также не нужно указывать нить $thread_id, потому что ... они уже есть. Попробуйте: threads -> self -> tid(); вместо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...