Perl, как получать данные из URL-адресов параллельно? - PullRequest
10 голосов
/ 25 июля 2011

Мне нужно получить некоторые данные от многих провайдеров веб-данных, которые не предоставляют никаких услуг, поэтому я должен написать что-то вроде этого, используя, например, WWW :: Mechanize:

use WWW::Mechanize;
@urls = ('http://www.first.data.provider.com', 'http://www.second.data.provider.com', 'http://www.third.data.provider.com');
%results = {};
foreach my $url (@urls) {
 $mech = WWW::Mechanize->new();
 $mech->get($url);
 $mech->form_number(1);
 $mech->set_fields('user' => 'myuser', pass => 'mypass');
 $resp = $mech->submit();
 $results{$url} = parse($resp->content());
}
consume(%results);

Есть линекоторый (возможно, простой ;-) способ извлечения данных в общую переменную% результатов, одновременно , то есть: параллельно, от всех поставщиков?

Ответы [ 4 ]

25 голосов
/ 26 июля 2011

threads следует избегать в Perl. use threads в основном для эмуляция форка UNIX-стиля в Windows; кроме того, это бессмысленно.

(Если вам интересно, реализация делает этот факт очень ясным. В perl, интерпретатор является PerlInterpreter объектом. Способ threads работает, сделав кучу потоков, а затем создать совершенно новый PerlInterpreter объект в каждом потоке. Темы делятся абсолютно ничего, даже меньше, чем делают дочерние процессы; fork получает вас копирование при записи, но с threads все копирование выполняется на Perl пространство! Медленно!)

Если вы хотите сделать много вещей одновременно в одном и том же процессе, способ сделать это в Perl с помощью цикла событий, как EV , Событие , или POE , или с помощью Coro. (Вы можете также напишите свой код в терминах AnyEvent API, который позволит Вы используете любой цикл обработки событий. Это то, что я предпочитаю.) Разница между ними то, как вы пишете свой код.

AnyEvent (и EV, Event, POE и т. Д.) Заставляет вас писать свой код в callback-ориентированных стиль. Вместо управления, перетекающего сверху вниз, управление находится в стиль продолжения Функции не возвращают значения, они вызывают другие функции с их результатами. Это позволяет вам запустить много IO параллельные операции - когда данная операция ввода-вывода дала Результаты, ваша функция для обработки этих результатов будет вызвана. когда другая операция ввода-вывода завершена, эта функция будет вызвана. А также и так далее.

Недостатком этого подхода является то, что вы должны переписать код. Итак, есть модуль под названием Coro, который дает Perl настоящий (пользовательское пространство) потоки, которые позволят вам писать код сверху вниз, но все же быть неблокирующим. (Недостатком этого является то, что это сильно изменяет внутренности Perl. Но, похоже, работает очень хорошо.)

Итак, поскольку мы не хотим переписывать WWW :: Mechanize сегодня вечером мы собираемся использовать Коро. Coro поставляется с модулем под названием Coro :: LWP , что сделает все звонки на LWP be неблокирующая. Будет заблокирован текущий поток ("сопрограмма", в Coro lingo), но он не будет блокировать другие потоки. Это означает, что вы можете сделать тонна запросов одновременно и обрабатывает результаты по мере их поступления имеется в наличии. И Coro будет масштабироваться лучше, чем ваше сетевое соединение; каждая сопрограмма использует всего несколько килобайт памяти, поэтому легко иметь десятки тысяч их вокруг.

Имея это в виду, давайте посмотрим некоторый код. Вот программа, которая начинается три HTTP-запроса параллельно, и печатает длину каждого ответ. Это похоже на то, что вы делаете, минус фактическое обработка; но вы можете просто положить свой код, где мы вычисляем длина и она будет работать одинаково.

Начнем с обычного шаблона сценария Perl:

#!/usr/bin/env perl

use strict;
use warnings;

Затем мы загрузим специфичные для Coro модули:

use Coro;
use Coro::LWP;
use EV;

Коро использует закулисную петлю событий; он выберет один для вас, если Вы хотите, но мы просто укажем EV явно. Это лучшее событие цикл. * * 1 043

Затем мы загрузим модули, которые нам нужны для нашей работы, а именно:

use WWW::Mechanize;

Теперь мы готовы написать нашу программу. Для начала нам нужен список URL:

my @urls = (
    'http://www.google.com/',
    'http://www.jrock.us/',
    'http://stackoverflow.com/',
);

Затем нам нужна функция, которая порождает поток и выполняет нашу работу. Сделать Новая тема на Coro, вы называете async как async { body; of the thread; goes here }. Это создаст поток, запустит его и продолжить с остальной частью программы.

sub start_thread($) {
    my $url = shift;
    return async {
        say "Starting $url";
        my $mech = WWW::Mechanize->new;
        $mech->get($url);
        printf "Done with $url, %d bytes\n", length $mech->content;
    };
}

Так вот в чем суть нашей программы. Мы просто поместили нашу обычную программу LWP внутри асинхронно, и это будет магически неблокирующим. get блоков, но другие сопрограммы будут работать в ожидании получения данных из сети.

Теперь нам просто нужно запустить темы:

start_thread $_ for @urls;

И, наконец, мы хотим начать обработку событий:

EV::loop;

И это все. Когда вы запустите это, вы увидите вывод, например:

Starting http://www.google.com/
Starting http://www.jrock.us/
Starting http://stackoverflow.com/
Done with http://www.jrock.us/, 5456 bytes
Done with http://www.google.com/, 9802 bytes
Done with http://stackoverflow.com/, 194555 bytes

Как видите, запросы выполняются параллельно, а у вас не было прибегнуть к threads!

Обновление

В оригинальном сообщении вы упомянули, что хотите ограничить количество HTTP-запросов, выполняемых параллельно. Один из способов сделать это с помощью семафора, Coro :: Семафор в Coro.

Семафор подобен счетчику. Когда вы хотите использовать ресурс, который защищает семафор, вы «выключаете» семафор. Это уменьшает счетчик и продолжает запускать вашу программу. Но если счетчик равен нулю, когда вы пытаетесь выключить семафор, ваш поток / сопрограмма перейдет в спящий режим, пока не станет ненулевым. Когда счет снова увеличится, ваш поток проснется, спустится вниз по семафору и продолжит работу. Наконец, когда вы закончите использовать ресурс, который защищает семафор, вы «поднимете» семафор и дадите другим потокам возможность работать.

Это позволяет вам контролировать доступ к общему ресурсу, например, «делать HTTP-запросы».

Все, что вам нужно сделать, это создать семафор, которым ваши потоки HTTP-запросов будут делиться:

my $sem = Coro::Semaphore->new(5);

5 означает «давайте вызовем« вниз »5 раз, прежде чем заблокировать», или, другими словами, «пусть будет 5 одновременных HTTP-запросов».

Прежде чем мы добавим какой-либо код, давайте поговорим о том, что может пойти не так. Что-то плохое, что может произойти, это поток «вниз» - семафор, но никогда не «вверх» - когда это будет сделано. Тогда ничто не сможет использовать этот ресурс, и ваша программа, вероятно, ничего не сделает. Есть много способов, которыми это может произойти. Если вы написали какой-то код, например $sem->down; do something; $sem->up, вы можете чувствовать себя в безопасности, но что, если «сделать что-то» вызывает исключение? Тогда семафор будет оставлен, и это плохо.

К счастью, Perl упрощает создание объектов Guard , которые автоматически запускают код, когда переменная, содержащая объект, выходит из области видимости. Мы можем сделать код $sem->up, и тогда нам никогда не придется беспокоиться об удержании ресурса, когда мы не собираемся.

Coro :: Semaphore объединяет концепцию охраны, что означает, что вы можете сказать my $guard = $sem->guard, и это автоматически опустит семафор и повысит его, когда управление уйдет из области, в которой вы вызвали guard.

Имея это в виду, все, что нам нужно сделать, чтобы ограничить число параллельных запросов, это guard семафор в верхней части наших сопрограмм с использованием HTTP:

async {
    say "Waiting for semaphore";
    my $guard = $sem->guard;
    say "Starting";
    ...;
    return result;
}

Обращаясь к комментариям:

Если вы не хотите, чтобы ваша программа работала вечно, есть несколько вариантов. Один из них - запустить цикл обработки событий в другом потоке, а затем join в каждом рабочем потоке. Это также позволяет передавать результаты из потока в основную программу:

async { EV::loop };

# start all threads
my @running = map { start_thread $_ } @urls;

# wait for each one to return
my @results = map { $_->join } @running;

for my $result (@results) {
    say $result->[0], ': ', $result->[1];
}

Ваши темы могут возвращать результаты, такие как:

sub start_thread($) {
    return async {
        ...;
        return [$url, length $mech->content];
    }
}

Это один из способов собрать все ваши результаты в структуру данных. Если вы не хотите возвращать вещи, помните, что все сопрограммы разделяют состояние. Таким образом, вы можете положить:

my %results;

вверху вашей программы, и пусть каждая сопрограмма обновляет результаты:

async {
    ...;
    $results{$url} = 'whatever';
};

Когда все сопрограммы будут запущены, ваш хеш будет заполнен результатами. Вам нужно будет join каждый сопрограмм, чтобы знать, когда ответ готов.

Наконец, если вы делаете это как часть веб-службы, вам следует использовать веб-сервер с поддержкой сопрограмм, такой как Corona . Это будет запускать каждый HTTP-запрос в сопрограмме, позволяя вам обрабатывать несколько HTTP-запросов параллельно, в дополнение к возможности отправлять HTTP-запросы параллельно. Это позволит очень эффективно использовать память, процессор и сетевые ресурсы, а также будет довольно прост в обслуживании!

(Вы можете вырезать и вставить нашу программу сверху в сопрограмму, которая обрабатывает HTTP-запрос; можно создавать новые сопрограммы и join внутри сопрограммы.)

5 голосов
/ 25 июля 2011

Похоже, ParallelUserAgent - это то, что вы ищете.

4 голосов
/ 25 июля 2011

Ну, вы могли бы создать потоки для этого - в частности, смотрите perldoc perlthrtut и Thread :: Queue . Так что это может выглядеть примерно так.

use WWW::Mechanize;
use threads;
use threads::shared;
use Thread::Queue;
my @urls=(#whatever
);
my %results :shared;
my $queue=Thread::Queue->new();
foreach(@urls)
{
   $queue->enqueue($_);
}

my @threads=();
my $num_threads=16; #Or whatever...a pre-specified number of threads.

foreach(1..$num_threads)
{
    push @threads,threads->create(\&mechanize);
}

foreach(@threads)
{
   $queue->enqueue(undef);
}

foreach(@threads)
{
   $_->join();
}

consume(\%results);

sub mechanize
{
    while(my $url=$queue->dequeue)
    {
        my $mech=WWW::Mechanize->new();
        $mech->get($url);
        $mech->form_number(1);
        $mech->set_fields('user' => 'myuser', pass => 'mypass');
        $resp = $mech->submit();
        $results{$url} = parse($resp->content());
    }
}

Обратите внимание, что поскольку вы сохраняете результаты в хэше (вместо записи чего-либо в файл), вам не нужно блокировать , если только не существует опасность перезаписи значений. В этом случае вам нужно заблокировать %results, заменив

$results{$url} = parse($resp->content());

с

{
    lock(%results);
    $results{$url} = parse($resp->content());
}
3 голосов
/ 25 июля 2011

Попробуйте https://metacpan.org/module/Parallel::Iterator - на прошлой неделе увидел очень хорошую презентацию об этом, и одним из примеров был параллельный поиск URL-адресов - он также описан в примере с модулем. Это проще, чем использовать потоки вручную (хотя он использует форк внизу).

Насколько я могу судить, вы все равно сможете использовать WWW::Mechanize, но избегайте возиться с разделением памяти между потоками. Это модель более высокого уровня для этой задачи, и она может быть немного проще, оставляя основную логику механизированной процедуры @Jack Maney без изменений.

...