асинхронная обратная связь websocket во время длительного процесса - PullRequest
0 голосов
/ 01 октября 2018

Я пытаюсь реализовать обратную связь на веб-странице, которая позволяет пользователю начать длинный процесс с листа Excel (зрение, да ...).Для каждой строки данных время обработки составляет около 1 секунды, а общая длина данных составляет от 40 до 100 элементов, поэтому общее время обработки может быть больше минуты.

Я отображаю предварительный просмотрданных на странице, начиная процесс через веб-сокет и хотел бы показать прогрессию от того же веб-сокета.

Сама обработка выполняется внешним пакетом, а сложность страницы минимальна, поэтому яобернул его в Lite отдельный файл.

Моя проблема в том, что длительная обработка, запущенная на маршруте веб-сокета, блокирует обратную связь до ее завершения, и все события progession отправляются одновременно вконец.Насколько я понимаю, это связано с циклом событий Mojolicious, и я должен начать обработку отдельно, чтобы избежать зависания обработки веб-сокета.

Обратите внимание, что я пробовал отдельный канал обратной связи с EventSourceвыдвинуть клиенту некоторый прогресс во время обработки, но он показывает то же самое завершение сразу в конце.

Вот мой упрощенный код, я использую sleep() для имитации длинного процесса.Я начинаю с

perl mojo_notify_ws.pl daemon

Не могли бы вы предложить, как изменить маршрут веб-сокета для обеспечения обратной связи в реальном времени?

use Mojolicious::Lite;
use Mojo::JSON qw(encode_json decode_json j);

use Data::Dumper;

$|++;

any '/' => sub {
    my $c = shift;
    $c->render('index');
};

my $peer;
websocket '/go' => sub {
    use Carp::Always;
    my $ws = shift;

    $peer = $ws->tx;
    app->log->debug(sprintf 'Client connected: %s', Dumper $peer->remote_address);

    # do not subscribe to 'text' else 'json' won't work
    #$ws->on(text => sub {
    #    my ($ws, $msg) = @_;
    #    app->log->debug("Received text from websocket: `$msg`");
    #        });

    # $peer->send('{"type": "test"}');
    # say 'default inactivity timeout='. (p $ws->inactivity_timeout());
    $ws->inactivity_timeout(120);

    $ws->on(json => sub {
        my ($ws, $msg) = @_;
        app->log->debug('Received from websocket:', Dumper(\$msg));
        unless($msg){
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');

        # simulate
        my $loop = Mojo::IOLoop->singleton;

#        $loop->subprocess( sub {
#            my $sp = shift;

        for my $cell (1..3) {
            # $loop->delay( sub {
                app->log->debug("sending cell $cell");
                my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                };
                $ws->send( { json => $payload } );
                sleep(2);
                # $loop->timer(2, sub {say 'we have waited 2 secs!';})->wait;
            # });
        };

#        }, sub {} );#subprocess

        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
    });

    $ws->on(finish => sub {
        my ($ws, $code, $reason) = @_;
        $reason = '' unless defined $reason;
        app->log->debug("Client disconnected: $code ($reason)");
    });

    app->log->debug('Reached end of ws route definition');
};

app->start;

__DATA__

@@ index.html.ep
<html>
    <head>
    <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.js"></script>
    <script>
var timerID = 0; 
function keepAlive(ws) { 
    var timeout = 20000;  
    if (ws.readyState == ws.OPEN) {  
        ws.send('ping');  
    }  
    timerId = setTimeout(function(){keepAlive(ws);}, timeout);  
}  
function cancelKeepAlive() {  
    if (timerId) {  
        clearTimeout(timerId);  
    }  
}

function flagCell(cell, result){
    var id='#CELL_' + cell;
    var cell = $(id);
    if(cell) {
        if (result=='OK') {
            cell.css('color', 'green');
            cell.text('⯲');
        } else {
            cell.css('color','red');
            cell.text('✘');
        }
    }
}

function process(){
    //debugger;
    console.log('Opening WebSocket');
    var ws = new WebSocket('<%= url_for('go')->to_abs %>');

    ws.onopen = function (){
        console.log('Websocket Open');
        //keepAlive(ws);
        ws.send(JSON.stringify({cmd: "let's go Perl"}));
    };
    //incoming
    ws.onmessage = function(evt){
        var data = JSON.parse(evt.data);
        console.log('WS received '+JSON.stringify(data));
        if (data.type == 'ticket') {
            console.log('Server has send a status');
            console.log('Cell:'+data.cell + ' res:' + data.result);

            flagCell(data.cell, data.result);
        } else if (data.type == 'end') {
            console.log('Server has finished.');
            //cancelKeepAlive();
            ws.close();
        } else {
            console.log('Unknown message:' + evt.data);
        }
    };
    ws.onerror = function (evt) {
        console.log('ws error:', evt.data);
    }
    ws.onclose = function (evt) {
        if(evt.wasClean) {
            console.log('Connection closed cleanly');
        } else {
            console.log('Connection reseted');
        }
        console.log('Code:'+ evt.code + ' Reason:' + evt.reason);
    }
}

    </script>
    </head>
    <body>
        <button type=button id='upload' onclick="process();">Go</button><br>
        <div style='font-family:sans;'>
            <table border="1px">
              <tr><td id="CELL_1">&nbsp;</td><td>Foo</td></tr>
              <tr><td id="CELL_2">&nbsp;</td><td>Bar</td></tr>
              <tr><td id="CELL_3">&nbsp;</td><td>Baz</td></tr>
            </table>
        </div>
    </body>
</html>

РЕДАКТИРОВАТЬ:

Гриннц предоставил подходящее решение, но для протокола вот моя попытка с Mojo::IOLoop::Subprocess обратным вызовом, но тогда у меня вообще нет отзывов.Я работаю в Linux, и Subprocess, кажется, разветвляется, , и родительский процесс, кажется, немедленно завершает веб-сокет edit: no: В конце концов я обнаружил, что $ws->send() находится нанеправильное место, так как оно должно быть помещено во второй sub{}, который запускается на родительской стороне, а не в первый, который запускается в дочернем процессе.Этот код должен быть реорганизован таким образом, чтобы иметь одну subprocess для каждой итерации цикла и последний шаг для уведомления о завершении.

Вот измененный on(json)

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper(\$msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');

    # my $loop = Mojo::IOLoop->singleton;
    my $subprocess = Mojo::IOLoop::Subprocess->new;
    app->log->debug("we are pid $$");
    $subprocess->run( 
        sub {
            my $sp = shift;
            for my $cell (1..3) {
                app->log->debug("starting process for cell $cell in pid $$");     
                sleep(2);
                app->log->debug("sending cell $cell to ws");
                my $payload = {
                    type => 'ticket',
                    cell => $cell,
                    result => $cell % 2 ? 'OK' : 'NOK'
                };
                $ws->send( { json => $payload } ); # FIXME: actually this line is in the wrong place
                                                   # and should be in the second sub{}
            };
        },
        sub {
            my ($sp, $err, @results) = @_; 
            $ws->reply->exception($err) and return if $err;
            app->log->debug('sending end of process ->websocket');
            $ws->send({json => { type => 'end' } });
        });  
    # Start event loop if necessary
    $subprocess->ioloop->start unless $subprocess->ioloop->is_running;       
});

и соответствующий журнал:

[Wed Oct  3 19:51:58 2018] [debug] Received: `let's go Perl`
[Wed Oct  3 19:51:58 2018] [debug] we are pid 8898
[Wed Oct  3 19:51:58 2018] [debug] Client disconnected: 1006 ()
[Wed Oct  3 19:51:58 2018] [debug] starting process for cell 1 in pid 8915
[Wed Oct  3 19:52:00 2018] [debug] sending cell 1 to ws
[Wed Oct  3 19:52:00 2018] [debug] starting process for cell 2 in pid 8915
[Wed Oct  3 19:52:02 2018] [debug] sending cell 2 to ws
[Wed Oct  3 19:52:02 2018] [debug] starting process for cell 3 in pid 8915
[Wed Oct  3 19:52:04 2018] [debug] sending cell 3 to ws
[Wed Oct  3 19:52:04 2018] [debug] sending end of process ->websocket
[Wed Oct  3 19:52:04 2018] [debug] Client disconnected: 1005 ()

Я также экспериментировал с Mojo::IOLoop->delay, чтобы сгенерировать сложную последовательность шагов, аналогичную решению Promise, но в конце он также отправляет все уведомления сразу:

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper(\$msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');

    app->log->debug("we are pid $$");

    my @steps;
    for my $cell (1..3) {
        push @steps, 
            sub {
                app->log->debug("subprocess for cell pid $cell");
                # my $sp = shift;
                my $delay = shift;
                sleep(2);
                app->log->debug("end of sleep for cell $cell");
                $delay->pass($cell % 2 ? 'OK' : 'NOK');
            },
            sub {
                my $delay = shift;
                my $result = shift;

                app->log->debug("sending cell $cell from pid $$ - result was $result");
                my $payload = {
                    type => 'ticket',
                    cell => $cell,
                    result => $result
            };
            $ws->send( { json => $payload } );
            $delay->pass;    
        };
    }

    # add final step to notify end of processing
    push @steps, sub {
        my $delay = shift;
        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
        $delay->pass;
    };

    my $delay = Mojo::IOLoop::Delay->new;
    app->log->debug("Starting delay...");
    $delay->steps( @steps );
    app->log->debug("After the delay");

});

Ответы [ 3 ]

0 голосов
/ 03 октября 2018

Невозможно волшебным образом сделать неблокирующий код Perl. Вот почему ваша операция блокировки задерживает ответы веб-сокетов и цикл обработки событий.

Один подпроцесс не будет работать дляэто потому, что только исходный рабочий процесс, обработавший запрос, может ответить на веб-сокет, а подпроцессы могут вернуться только один раз.Однако вы можете использовать подпроцесс для подготовки каждого ответа, который вы хотите отправить.Однако использование вами подпроцессов не совсем корректно.

Первая подпрограмма, переданная подпроцессу, выполняется в форке и, таким образом, не блокирует основной процесс.Вторая подпрограмма выполняется в родительском элементе после завершения подпроцесса и получает возвращаемое значение первой подпрограммы.Здесь вам нужно отправить свои ответы.

Любой код вне этого будет выполнен до того, как подпроцесс даже будет запущен, потому что это асинхронный код, вам нужно упорядочить логику с помощью обратных вызовов.Вы можете использовать обещания , чтобы упростить сложное секвенирование.

use Mojo::Promise;

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper(\$msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');

    my $promise = Mojo::Promise->new->resolve; # starting point
    # attach follow-up code for each cell, returning a new promise representing the whole chain so far
    for my $cell (1..3) {
        $promise = $promise->then(sub {
            my $promise = Mojo::Promise->new;
            Mojo::IOLoop->subprocess(sub {
                app->log->debug("sending cell $cell");
                sleep(2);
                my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                };
                return $payload;
            }, sub {
                my ($sp, $err, $payload) = @_;
                return $promise->reject($err) if $err; # indicates subprocess died
                $ws->send( { json => $payload }, sub { $promise->resolve } );
            });

            # here, the subprocess has not been started yet
            # it will be started when this handler returns to the event loop
            # then the second callback will run once the subprocess exits
            return $promise;
        };
    }

    # chain from last promise
    $promise->then(sub {
        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
    })->catch(sub {
        my $err = shift;
        # you can send or log something here to indicate an error occurred in one of the subprocesses
    });
});

Некоторые другие параметры, которые можно использовать более подробно, если они будут уместны: Mojo :: IOLoop :: ReadWriteFork, который позволил бы вам запустить только один подпроцесс и непрерывно получать от него STDOUT (вам нужно было бы самостоятельно сериализовать свою полезную нагрузку, чтобы отправить ее на STDOUT, как в Mojo :: JSON);или обычный подпроцесс, который отправляет информацию о статусе родителю через внешнего посредника pub / sub, к которому могут подключиться оба процесса, например Postgres , Redis или Mercury (также потребует сериализации).

0 голосов
/ 12 апреля 2019

Я внес небольшое изменение в ваш обновленный пример, чтобы он работал как положено.Вы можете использовать функцию progress модуля Subprocess, чтобы гарантировать, что правильные данные отправляются через веб-сокет асинхронно из длинного подпроцесса.

Код теперь работает, как и ожидалось, состояние таблицы обновляется на стороне клиента каждый раз, когда подпроцесс проходит итерацию.

Соответствующая часть исходного кода выглядит следующим образом:

$ws->on(
    json => sub {
        my ( $ws, $msg ) = @_;
        app->log->debug( 'Received from websocket:', Dumper( \$msg ) );
        unless ($msg) {
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug( sprintf 'Received: `%s`', $prompt // '<empty??>' );

        # my $loop = Mojo::IOLoop->singleton;
        my $subprocess = Mojo::IOLoop::Subprocess->new;
        app->log->debug("we are pid $$");
        $subprocess->run(
            sub {
                my $sp = shift;
                for my $cell ( 1 .. 3 ) {
                    app->log->debug(
                        "starting process for cell $cell in pid $$");
                    sleep(2);
                    app->log->debug("sending cell $cell to ws");
                    my $payload = {
                        type   => 'ticket',
                        cell   => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                    };
                    $sp->progress($payload);
                }
            },
            sub {
                my ( $sp, $err, @results ) = @_;

                #$ws->send( { json => $payload } );
                $ws->reply->exception($err) and return if $err;
                app->log->debug('sending end of process ->websocket');
                $ws->send( { json => { type => 'end' } } );
            }
        );

        # Start event loop if necessary
        $subprocess->on(
            progress => sub {
                my ( $subprocess, $payload ) = @_;
                $ws->send( { json => $payload } );
            }
        );
        $subprocess->ioloop->start unless $subprocess->ioloop->is_running;
    }
);
0 голосов
/ 02 октября 2018

Вы можете использовать поток вместо подпроцесса для выполнения работы.После создания потока вам нужен цикл, который обновляет прогресс через websocket.

Если вы справляетесь с критическими рабочими нагрузками, которые действительно должны быть завершены при любых обстоятельствах (веб-сокет пропал, сеть не работает и т. Д.), Вы должны делегировать его другому демону, который сохраняется и сообщает свое состояние через файл или файл.socket.

Если это не критичная рабочая нагрузка, и вы можете легко перезапустить ее, это может быть шаблон для вас.

# Insert this at module header
# use threads;
# use Thread::Queue;

my $queue  = Thread::Queue->new();
my $worker = threads->create(sub {
  # dummy workload. do your work here
  my $count = 60;
  for (1..$count) {
    sleep 1;
    $queue->enqueue($_/$count);
  }

  # undef to signal end of work
  $queue->enqueue(undef);

  return;
});

# blocking dequeuing ends when retrieving an undef'd value
while(defined(my $item = $queue->dequeue)) {
  # update progress via websocket
  printf("%f %\n", $item);
}

# join thread
$worker->join;
...