Что происходит, когда разные планировщики потоков используются в одном и том же реактивном блоке? - PullRequest
10 голосов
/ 22 мая 2019

Это дополнительный вопрос к зависит ли сигнал () в порядке реагирования блока? .

Следующий код с использованием планировщика по умолчанию $*SCHEDULER позволяет пользователю немедленно выйти, нажав CTRL-C в следующем цикле событий:

use v6;
my %scheduler;
my $use-default-scheduler = True;
if $use-default-scheduler {
    %scheduler = scheduler => $*SCHEDULER;
}    
react {
    whenever signal(SIGINT, |%scheduler) {
        say "Got signal";
        exit;
    }
    whenever Supply.from-list($*IN.lines, |%scheduler) {
        say "Got line";
        exit if $++ == 1 ;
    }
}

Меня интересует, что произойдет, если я использую два разных планировщика потоков в одном цикле react? Если я использую планировщик потоков по умолчанию Supply.from-list() вместо $*SCHEDULER, установив $use-default-scheduler = False в приведенном выше коде. Теперь пользователь не может немедленно выйти из блока react, нажав CTRL-C. Если он нажимает CTRL-C, программа просто зависает, пока не будет нажата кнопка ввода.

Так что на самом деле здесь происходит? react фокусируется только на одном цикле событий за раз? (Здесь я представляю два цикла событий: один для планировщика по умолчанию, используемого в первом whenever для сигнала SIGINT, а другой для источника $*IN.lines). Таким образом, react теперь фокусируется на планировщике from-list() для $*IN.lines, но каким-то образом SIGINT был проигнорирован в этом цикле событий? То есть нажатие CTRL-C не меняет состояние блока react?

1 Ответ

10 голосов
/ 25 мая 2019

Чтобы увидеть, что на самом деле происходит, давайте перепишем программу, чтобы (что-то вроде) покрыть то, что делает react.Я собираюсь игнорировать многочисленные детали, которые не имеют большого значения для рассматриваемого вопроса.

Чтобы сделать вещи немного более компактными, я просто перепишу этот сегмент предоставленной программы:

react {
    whenever signal(SIGINT, |%scheduler) {
        say "Got signal";
        exit;
    }
    whenever Supply.from-list($*IN.lines, |%scheduler) {
        say "Got line";
        exit if $++ == 1 ;
    }
}

Прежде всего, react { ... } действительно похож на await supply { ... } - то есть он касается блока supply { } и await s его конца.

await supply {
    whenever signal(SIGINT, |%scheduler) {
        say "Got signal";
        exit;
    }
    whenever Supply.from-list($*IN.lines, |%scheduler) {
        say "Got line";
        exit if $++ == 1 ;
    }
}

Ночто такое supply блок?По сути, supply (и так react) предлагают:

  • Контроль параллелизма, поэтому он будет обрабатывать по одному сообщению за раз (поэтому нам нужна какая-то блокировка; он использует Lock::Async для этого)
  • Завершение и распространение ошибок (в явном обмане мы будем использовать Promise для реализации этого, потому что мы действительно хотим только часть react; реальная вещь производитв результате Supply, что мы можем emit значений в)
  • Автоматическое завершение, когда нет ожидающих подписок (мы будем отслеживать их в SetHash)

Таким образом,мы могли бы переписать программу примерно так:

await do {
    # Concurency control
    my $lock = Lock::Async.new;
    # Completion/error conveyance
    my $done = Promise.new;
    # What's active?
    my %active is SetHash;

    # An implementation a bit like that behind the `whenever` keyword, but with
    # plenty of things that don't matter for this question missing.
    sub whenever-impl(Supply $s, &block) {
        # Tap the Supply
        my $tap;
        $s.tap:
            # When it gets tapped, add the tap to our active set.
            tap => {
                $tap = $_; 
                %active{$_} = True;
            },
            # Run the handler for any events
            { $lock.protect: { block($_) } },
            # When this one is done, remove it from the %active list; if it's
            # the last thing, we're done overall.
            done => {
                $lock.protect: {
                    %active{$tap}:delete;
                    $done.keep() unless %active;
                }
            },
            # If there's an async error, close all taps and pass it along.
            quit => {
                $lock.protect: -> $err {
                    .close for %active.keys;
                    $done.quit($err);
                }
            }
    }

    # We hold the lock while doing initial setup, so you can rely on having
    # done all initialization before processing a first message.
    $lock.protect: {
        whenever-impl signal(SIGINT, |%scheduler), {
            say "Got signal";
            exit;
        }
        whenever-impl Supply.from-list($*IN.lines, |%scheduler), {
            say "Got line";
            exit if $++ == 1 ;
        }
    }

    $done
}

Обратите внимание, что здесь нет ничего общего с планировщиками или циклами событий;supply или react не заботится о том, от кого приходит сообщение, он просто заботится о своей собственной целостности, которая обеспечивается с помощью Lock::Async.Также обратите внимание, что он также не вводит параллелизма: это действительно просто управляющая конструкция параллелизма.

Как правило, каждый использует supply и react с источниками данных, где вы tap их и сразу получаетезадний контроль.Затем мы переходим к настройке дополнительных блоков whenever, выпадаем из фазы настройки, и блокировка доступна для любых сообщений, которые мы получаем.Это поведение - то, что вы получаете практически со всеми расходными материалами, с которыми вы обычно сталкиваетесь.Это в случае с signal(...).Это также тот случай, когда вы даете Supply.from-list(...) явный планировщик, передавая $*SCHEDULER;в таком случае он планирует цикл, который читает из $*IN в пуле, и немедленно возвращает управление обратно.

Проблема возникает, когда мы сталкиваемся с чем-то, что не ведет себя так.Если мы нажмем Supply.from-list($*IN.lines), по умолчанию будет выполнено чтение из $*IN в текущем потоке, чтобы получить значение emit, потому что Supply.from-list использует CurrentThreadScheduler в качестве значения по умолчанию.И что это делает?Просто запустите код, который его просят запустить немедленно!

Однако это оставляет нам еще одну загадку.Если Lock::Async не является реентерабельным, то если мы:

  1. Получим блокировку для выполнения настройки
  2. Вызовите tap на Supply.from-list(...), который работает синхронно и пытается emit значение
  3. Попробуйте получить блокировку, чтобы мы могли обработать значение

Тогда мы просто зашли бы в тупик, потому что мы пытаемся получить не входящего в системузамок, который уже удержан - нами.Действительно, если вы запустите мой десугар программы здесь, это именно то, что происходит: она зависает.Однако оригинальный код не зависает;это ведет себя немного неловко.Что дает?

Одна из вещей, которую делает реальная реализация, обнаруживает такие случаи на этапе установки;затем он принимает продолжение и возобновляет его после завершения фазы установки.Это означает, что мы можем делать такие вещи, как:

my $primes = supply {
    .emit for ^Inf .grep(*.is-prime);
}
react {
    whenever $primes { .say }
    whenever Promise.in(3) { done }
}

И у нас все получится.Я не буду воспроизводить это забавное здесь, но это должно быть возможно при достаточно хитром использовании gather / take.

...