мастер-рабочий сервер с AnyEvent - PullRequest
1 голос
/ 12 июля 2011

Мне нужно написать серверную программу, которая будет открывать и поддерживать 5 активных Telnet, SSH и различные соединения с сервером, в то время как она слушает вызовы JSONRPC на другой стороне.

У меня нет проблем с открытием и обслуживанием 5 соединений и прослушиванием запросов:

# workers:
open_myconnections( 1..5 );
my $w1 = AnyEvent->timer (interval => $seconds, cb => sub { keep_conn_alive(1) });
my $w2 = AnyEvent->timer (interval => $seconds, cb => sub { keep_conn_alive(2) });
...

# now listen for requests
use AnyEvent::JSONRPC::Lite;
# master: 
my $server = jsonrpc_server '127.0.0.1', '4423';
$server->reg_cb(
    queue_up => sub {
        my ($res_cv, @params) = @_;
        my $res = send_params_to_connection_queue( @params ); 
        $res_cv->result($res);
    },
);

Но сейчас я застрял, пытаясь найти лучший способ (то есть неблокирующий , AnyEvent способ) для распределения очереди среди 5 рабочих, что и делает моя функция send_params_to_connection_queue().

Любое предложение модуля приветствуется, хотя я стараюсь избегать использования POE , так как предполагается, что это очень крошечный сервер и, если только нет другого разумного вариантасуществует, я передам.

Ответы [ 2 ]

3 голосов
/ 12 июля 2011

На YAPC :: NA 2011 я узнал об очень хорошей системе очередей под названием ZeroMQ. С привязками Perl на Прямые привязки Perl и Связывания AnyEvent .


                                                |----- ssh worker
JSONRPC-Listener-EventLoop---->QueueingSystem---|----- telnet worker
                                                |----- etc.
2 голосов
/ 15 июня 2013

Только вы знаете, что такое «лучший», но есть общие правила, как это сделать, и, возможно, это ваш настоящий вопрос.

Обычно вам нужна очередь, скажем, массив, в котором хранятся все еще не отправленные запросы:

my @queue;

затем, каждый раз, когда вы хотите выполнить запрос, вы сначала помещаете данные в очередь, а затем вызываете функцию «планировщика»:

sub do_req { my ($ data, $ cb) = @_; push @queue, [$ data, $ cb]; планировщик; }

Цель функции расписания - выбрать работника и уволить следующий запрос:

sub scheduler { @queue или возврат; # ничего не делать с пустой очередью

  for my $worker (@workers) {
     if ("$worker is free") {
        my ($data, $cb) = @{ shift @queue };
        $worker->doit (sub {
           &scheduler;
           $cb->(@_); # call original callback
        });
        return;
     }
  }

}

Для этого нужно найти свободного работника, отправить ему запрос («doit») и когда он заканчивается, он снова вызывает планировщик.

От вас зависит, как вы решите, свободен ли работник. Например, вы всегда мог выбрать работника с наименьшим количеством невыполненных запросов. Или вы можете выбрать работника, у которого, скажем, менее 5 невыполненных запросов. Или какой путь лучше.

Важно то, что когда работник может стать свободным (потому что работа завершается), вы снова вызываете планировщик, чтобы поставить в очередь следующее задание.

Таким образом, вы можете наложить произвольные ограничения на работников, не забывая при этом о работах, которые вы должны выполнить, но не можете отправить из-за этих ограничений.

В этой технике много вариатов, но основная идея всегда то же самое: сначала вещи в очереди, затем есть функция, которую вы вызываете после в очереди и после выполнения задания, которое затем следует распределить в очереди работа.

Во время разговора, есть два события, когда вам может понадобиться отправить запрос: когда новая работа / запрос создается / ставится в очередь, и когда невыполненный работа выполнена.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...