Perl: неблокирующий канал - только одно сообщение - PullRequest
0 голосов
/ 30 октября 2018

Несколько недель назад я задал вопрос о реализации неблокирующего канала один родитель-много-дочерний, на который умело ответил @mob here

Тем не менее, я заметил, что если дочерний элемент отправляет более одного сообщения перед выходом, родитель получает первое, только если он читает немного позже.

Пример кода:

use IO::Handle;
use POSIX ':sys_wait_h';
pipe(READER,WRITER);
WRITER->autoflush(1);

sub child_process {
    close READER;  # also a best but optional practice
    srand($$);
    my $id = 0;
        sleep 1 + 5*rand();
        $id++;
        print "Child Pid $$ sending message $id now...\n";
        print WRITER "$id:Child Pid $$ is sending this - Message 1\n";
        print WRITER "$id:Child Pid $$ is sending this - Message 2\n";
        exit 0;
}

if (fork() == 0) {
    child_process();
}

# parent
my ($rin,$rout) = ('');
vec($rin,fileno(READER),1) = 1;
while (1) {
     # non-blocking read on pipe
     my $read_avail = select($rout=$rin, undef, undef, 0.0);
     if ($read_avail < 0) {
         if (!$!{EINTR}) {
             warn "READ ERROR: $read_avail $!\n";
             last;
         }
     } elsif ($read_avail > 0) {
         chomp(my $line = <READER>);
         print "Parent Got $$: '$line'\n";
     } else {
            print STDERR "No input ... do other stuff\n";
     }
     sleep 5;
}
close WRITER;  # now it is safe to do this ...

Ожидаемый результат:

Я должен получить оба сообщения.

Что я получу: только первое сообщение

No input ... do other stuff
No input ... do other stuff
Child Pid 8594 sending message 1 now...
Parent Got 8593: '1:Child Pid 8594 is sending this - Message 1'
No input ... do other stuff

Предполагается, что это неблокирующее чтение, так что домой не может получить данные на следующей итерации? Это потому, что ребенок вышел? Я попытался сделать while (chomp(my $line = <READER>)) в родительском, но это блокирует, что у меня не может быть.

Ответы [ 3 ]

0 голосов
/ 31 октября 2018

Вы читаете только одну строку за итерацию, а не читаете все доступные данные в канале. Возможно, select () больше не указывает на то, что он больше доступен для чтения. Обратите внимание, что, поскольку вы разветвляетесь, вам также необходимо собрать подпроцесс с waitpid после его выхода (в режиме блокировки waitpid будет ожидать его выхода), это вернет код завершения дочернего процесса.

Я рекомендую использовать цикл обработки событий для управления каналами между процессами, так как он и его вспомогательные модули будут управлять всеми странными деталями как разветвления процесса, так и обмена данными. Вот как может выглядеть что-то, использующее IO :: Async .

use strict;
use warnings;
use IO::Async::Loop;
use IO::Async::Channel;
use IO::Async::Routine;

my $channel = IO::Async::Channel->new;

sub child_process {
    my $id = 0;
    sleep 1 + 5*rand();
    $id++;
    print "Child Pid $$ sending message $id now...\n";
    $channel->send(\"$id:Child Pid $$ is sending this - Message 1\n");
    $channel->send(\"$id:Child Pid $$ is sending this - Message 2\n");
}

my $loop = IO::Async::Loop->new;
my $f = $loop->new_future;
my $routine = IO::Async::Routine->new(
  channels_out => [$channel],
  code => \&child_process,
  on_return => sub { my $routine = shift; $f->done(@_) },
  on_die => sub { my $routine = shift; $f->fail(@_) },
);
$loop->add($routine);

$channel->configure(on_recv => sub {
  my ($channel, $ref) = @_;
  print "Parent Got: '$$ref'\n";
});

# wait for Future to complete (process finishes) or fail (process fails to start or dies)
my $exitcode = $f->get;
print "Child exited with exit code $exitcode\n";

Обратите внимание, что IO :: Async :: Channel - это просто абстракция вокруг IO :: Async :: Stream для отправки структур данных между процессами и IO :: Async :: Routine - это абстракция вокруг IO :: Async :: Process (или поток в системах Windows) для настройки каналов для разветвленного кода. IO :: Async :: Function дополнительно представляет собой высокоуровневую оболочку IO :: Async :: Routine, которая может управлять пулом разветвлений / потоков для многократного запуска подпрограммы с разными входными данными и получения возвращаемых значений. в родительском. Таким образом, есть много уровней, которые вы можете использовать в зависимости от того, насколько глубоко вы хотите погрузиться.

0 голосов
/ 31 октября 2018

Хорошо, похоже, я вижу выгоду от первой рекомендации @ Grinnz использовать четко определенные рамки. Я думал, что мне нужен трехколесный велосипед, но похоже, что я медленно строю BMW из гаек и болтов.

@ моб и предложения @ grinnz были верны. Это был случай буфера / против / без буфера.

chomp(my @lines = <READER>);
seek READER, 0, 1;

не работает. Он запирает.

Этот рецепт кулинарной книги работает, но я подправлю / протестирую его завтра ( source ). Пока все хорошо:

use IO::Handle;
use POSIX ':sys_wait_h';
use Symbol qw(qualify_to_ref);
use IO::Select;
pipe(READER,WRITER);
WRITER->autoflush(1);

sub sysreadline(*;$) {
    my($handle, $timeout) = @_;
    $handle = qualify_to_ref($handle, caller( ));
    my $infinitely_patient = (@_ == 1 || $timeout < 0);
    my $start_time = time( );
    my $selector = IO::Select->new( );
    $selector->add($handle);
    my $line = "";
SLEEP:
    until (at_eol($line)) {
        unless ($infinitely_patient) {
            return $line if time( ) > ($start_time + $timeout);
        }
        # sleep only 1 second before checking again
        next SLEEP unless $selector->can_read(1.0);
INPUT_READY:
        while ($selector->can_read(0.0)) {
            my $was_blocking = $handle->blocking(0);
CHAR:       while (sysread($handle, my $nextbyte, 1)) {
                $line .= $nextbyte;
                last CHAR if $nextbyte eq "\n";
            }
            $handle->blocking($was_blocking);
            # if incomplete line, keep trying
            next SLEEP unless at_eol($line);
            last INPUT_READY;
        }
    }
    return $line;
}
sub at_eol($) { $_[0] =~ /\n\z/ }

sub child_process {
    close READER;  # also a best but optional practice
    srand($$);
    my $id = 0;
        sleep 1 + 5*rand();
        $id++;
        print "Child Pid $$ sending message $id now...\n";
        print WRITER "$id:Child Pid $$ is sending this - Message 1\n";
        print WRITER "$id:Child Pid $$ is sending this - Message 2\n";
        exit 0;
}

if (fork() == 0) {
    child_process();
}

# parent
my ($rin,$rout) = ('');
vec($rin,fileno(READER),1) = 1;
while (1) {
     # non-blocking read on pipe
     while ((my $read_avail = select($rout=$rin, undef, undef, 0.0)) !=0) 
     {
        if ($read_avail < 0) {
                 if (!$!{EINTR}) {
                 warn "READ ERROR: $read_avail $!\n";
                last;
                }
        }
        elsif ($read_avail > 0) {
         chomp(my $line = sysreadline(READER));
         print "Parent Got $$: '$line'\n";
         print "END MESSAGE\n";
       }
     }
     print STDERR "input queue empty...\n";
     print "Sleeping main for 5...\n";
     sleep 5;
}
close WRITER;  # now it is safe to do this ...
0 голосов
/ 31 октября 2018

Похоже, вы смешиваете буферизованный и небуферизованный ввод / вывод. <READER>readline(READER)) - буферизованные операции ввода. При первом вызове readline для дескриптора файла Perl попытается прочитать до 8K данных из дескриптора, сохранив большую их часть в буфере памяти. В следующий раз, когда вы вызовете readline для того же дескриптора файла, Perl попытается вернуть данные в буфер, прежде чем попытаться снова прочитать больше данных из файла. Это для эффективности.

select - это операция для небуферизованного ввода-вывода. Он сообщает вам, ожидает ли ввод самого дескриптора файла, но не может увидеть, ожидают ли данные в буфере.

Неуклюжей альтернативой будет использование sysread или getc для извлечения данных из канала. Это неудобно, потому что вам придется разбивать ввод самостоятельно на отдельные строки.

... if ($read_avail > 0) {
    my $n = sysread READER, my $lines, 16384;
    chomp($lines);
    my @lines = split /\n/, $lines;
    print "Parent Got $$: '$_'\n" for @lines;
} ...

Что может сработать, так это чтение из дескриптора файла в контексте списка.

chomp(my @lines = <READER>);
seek READER, 0, 1;

должен прочитать все доступные данные из буфера и дескриптора файла, и теоретически он оставит ваш буфер пустым, поэтому следующий вызов <READER> будет похож на небуферизованное чтение. (Оператор seek очищает условие EOF для файлового дескриптора, так что вы можете читать из файлового дескриптора позже, когда поступит больше ввода).

(ETA: нет, это не сработает. Это просто заблокирует READER, пока ребенок не закроет свой конец канала)


Документы для select имеют это предупреждение

ПРЕДУПРЕЖДЕНИЕ: Не следует пытаться смешивать буферизованный ввод / вывод (например, «read» или <FH>) с «select», за исключением случаев, разрешенных POSIX, и даже тогда только в системах POSIX. Вы должны использовать "sysread" вместо этого.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...