Чтобы увидеть, что на самом деле происходит, давайте перепишем программу, чтобы (что-то вроде) покрыть то, что делает react
.Я собираюсь игнорировать многочисленные детали, которые не имеют большого значения для рассматриваемого вопроса.
Чтобы сделать вещи немного более компактными, я просто перепишу этот сегмент предоставленной программы:
react {
whenever signal(SIGINT, |%scheduler) {
say "Got signal";
exit;
}
whenever Supply.from-list($*IN.lines, |%scheduler) {
say "Got line";
exit if $++ == 1 ;
}
}
Прежде всего, react { ... }
действительно похож на await supply { ... }
- то есть он касается блока supply { }
и await
s его конца.
await supply {
whenever signal(SIGINT, |%scheduler) {
say "Got signal";
exit;
}
whenever Supply.from-list($*IN.lines, |%scheduler) {
say "Got line";
exit if $++ == 1 ;
}
}
Ночто такое supply
блок?По сути, supply
(и так react
) предлагают:
- Контроль параллелизма, поэтому он будет обрабатывать по одному сообщению за раз (поэтому нам нужна какая-то блокировка; он использует
Lock::Async
для этого) - Завершение и распространение ошибок (в явном обмане мы будем использовать
Promise
для реализации этого, потому что мы действительно хотим только часть react
; реальная вещь производитв результате Supply
, что мы можем emit
значений в) - Автоматическое завершение, когда нет ожидающих подписок (мы будем отслеживать их в
SetHash
)
Таким образом,мы могли бы переписать программу примерно так:
await do {
# Concurency control
my $lock = Lock::Async.new;
# Completion/error conveyance
my $done = Promise.new;
# What's active?
my %active is SetHash;
# An implementation a bit like that behind the `whenever` keyword, but with
# plenty of things that don't matter for this question missing.
sub whenever-impl(Supply $s, &block) {
# Tap the Supply
my $tap;
$s.tap:
# When it gets tapped, add the tap to our active set.
tap => {
$tap = $_;
%active{$_} = True;
},
# Run the handler for any events
{ $lock.protect: { block($_) } },
# When this one is done, remove it from the %active list; if it's
# the last thing, we're done overall.
done => {
$lock.protect: {
%active{$tap}:delete;
$done.keep() unless %active;
}
},
# If there's an async error, close all taps and pass it along.
quit => {
$lock.protect: -> $err {
.close for %active.keys;
$done.quit($err);
}
}
}
# We hold the lock while doing initial setup, so you can rely on having
# done all initialization before processing a first message.
$lock.protect: {
whenever-impl signal(SIGINT, |%scheduler), {
say "Got signal";
exit;
}
whenever-impl Supply.from-list($*IN.lines, |%scheduler), {
say "Got line";
exit if $++ == 1 ;
}
}
$done
}
Обратите внимание, что здесь нет ничего общего с планировщиками или циклами событий;supply
или react
не заботится о том, от кого приходит сообщение, он просто заботится о своей собственной целостности, которая обеспечивается с помощью Lock::Async
.Также обратите внимание, что он также не вводит параллелизма: это действительно просто управляющая конструкция параллелизма.
Как правило, каждый использует supply
и react
с источниками данных, где вы tap
их и сразу получаетезадний контроль.Затем мы переходим к настройке дополнительных блоков whenever
, выпадаем из фазы настройки, и блокировка доступна для любых сообщений, которые мы получаем.Это поведение - то, что вы получаете практически со всеми расходными материалами, с которыми вы обычно сталкиваетесь.Это в случае с signal(...)
.Это также тот случай, когда вы даете Supply.from-list(...)
явный планировщик, передавая $*SCHEDULER
;в таком случае он планирует цикл, который читает из $*IN
в пуле, и немедленно возвращает управление обратно.
Проблема возникает, когда мы сталкиваемся с чем-то, что не ведет себя так.Если мы нажмем Supply.from-list($*IN.lines)
, по умолчанию будет выполнено чтение из $*IN
в текущем потоке, чтобы получить значение emit
, потому что Supply.from-list
использует CurrentThreadScheduler
в качестве значения по умолчанию.И что это делает?Просто запустите код, который его просят запустить немедленно!
Однако это оставляет нам еще одну загадку.Если Lock::Async
не является реентерабельным, то если мы:
- Получим блокировку для выполнения настройки
- Вызовите
tap
на Supply.from-list(...)
, который работает синхронно и пытается emit
значение - Попробуйте получить блокировку, чтобы мы могли обработать значение
Тогда мы просто зашли бы в тупик, потому что мы пытаемся получить не входящего в системузамок, который уже удержан - нами.Действительно, если вы запустите мой десугар программы здесь, это именно то, что происходит: она зависает.Однако оригинальный код не зависает;это ведет себя немного неловко.Что дает?
Одна из вещей, которую делает реальная реализация, обнаруживает такие случаи на этапе установки;затем он принимает продолжение и возобновляет его после завершения фазы установки.Это означает, что мы можем делать такие вещи, как:
my $primes = supply {
.emit for ^Inf .grep(*.is-prime);
}
react {
whenever $primes { .say }
whenever Promise.in(3) { done }
}
И у нас все получится.Я не буду воспроизводить это забавное здесь, но это должно быть возможно при достаточно хитром использовании gather
/ take
.