Как Parallel.ForEach обрабатывает отмены или ThrowIfCancellationRequested () и исключения - PullRequest
1 голос
/ 27 июня 2019

Я создал приложение WPF, чтобы посмотреть, как работает TPL, и меня смущает вывод, который у меня был.Ниже приведен мой код:

// Two buttons, 'Process' button and 'Cancel' button
public partial class MainWindow : Window 
{
   private CancellationTokenSource cancelToken = new CancellationTokenSource();
   public MainWindow()
   {
      InitializeComponent();
   }
   //...

   private void cmdProcess_Click(object sender, EventArgs e)  // Sequence A
   {
      Task.Factory.StartNew(() => ProcessFiles()); 
   }

    private void cmdCancel_Click(object sender, EventArgs e)   //Sequence B
   {
      cancelToken.Cancel();  
   }

   private void ProcessFiles() 
   {
      ParallelOptions parOpts = new ParallelOptions();
      parOpts.CancellationToken = cancelToken.Token;
      parOpts.MaxDegreeOfParallelism = System.Environment.ProcessorCount;

      string[] files = { "first", "second" };
      try
      {
         Parallel.ForEach(files, parOpts, currentFile =>
         {
            parOpts.CancellationToken.ThrowIfCancellationRequested();  //Sequence C
            Thread.Sleep(5000);
         });
      }
      catch (OperationCanceledException ex)
      { 
         MessageBox.Show("Caught");
      }
   }

}

Когда я нажимаю кнопку click, а затем быстро нажимаю кнопку cancel, я получаю сообщение «Поймано», которое появляется только один раз, а не дважды.

Допустим, идентификатор основного потока равен 1, а рабочие потоки - 2 и 3, поэтому у меня есть два вопроса:

Q1 - когда я нажимаю кнопку cancel, рабочие потоки 2 и 3 ужевыполнено 'parOpts.CancellationToken.ThrowIfCancellationRequested ();'(конечно, мой щелчок мыши не может быть таким же быстрым, как выполнение потока).Когда они выполняют ThrowIfCancellationRequested, cancelToken не был отменен, что означает, что кнопка отмены не была нажата для потока 2 и потока 3. Так почему же эти рабочие потоки по-прежнему выдают исключение?

Q2 - почемуполучить только одно всплывающее окно сообщения, не должно ли быть два, один для потока 2 и один для потока 3?

Q3 - я изменил Parallel.ForEach следующим образом:

try
{
   Parallel.ForEach(files, parOpts, currentFile =>
   {
      Thread.Sleep(5000);
      parOpts.CancellationToken.ThrowIfCancellationRequested(); 

   });
}
catch (OperationCanceledException ex)
{ 
   MessageBox.Show("Caught");
}

теперь я могу нажать кнопку отмены, прежде чем рабочий поток достигнет ThrowIfCancellationRequested (), но я все еще получаю только одно исключение, выдаваемоеосновная нить.Но я нажал кнопку cancal, токен был настроен на отмену, поэтому, когда вспомогательный рабочий поток достигает значения parOpts.CancellationToken.ThrowIfCancellationRequested();, не должен ли он также выдавать исключение?и это исключение не может быть обработано перехватом try в основном потоке (каждый поток имеет свой собственный стек), поэтому я должен получить необработанное исключение для остановки приложения, но это не так, я просто получаю одно исключение, генерируемое основным потокоми это исключение выдается первичным потоком или рабочим потоком?

Q4-я изменяю код следующим образом:

private void ProcessFilesz()
{
    ParallelOptions parOpts = new ParallelOptions();
    parOpts.CancellationToken = cancelToken.Token;
    parOpts.MaxDegreeOfParallelism = System.Environment.ProcessorCount;

    cancelToken.Cancel(); // cancel here
    string[] files = { "first", "second" };
    try
    {
        Parallel.ForEach(files, parOpts, currentFile =>
        {
            MessageBox.Show("Underline Thread is " + Thread.CurrentThread.ManagedThreadId.ToString());
            parOpts.CancellationToken.ThrowIfCancellationRequested();
        });

    }
    catch (OperationCanceledException ex)
    {
        MessageBox.Show("catch");
    }
}

снова есть что-то странное, всплывающее окно сообщения отсутствует, хотятокен настроен на отмену, но оператор MessageBox.Show(...) находится над оператором parOpts.CancellationToken.ThrowIfCancellationRequested();, поэтому сначала следует выполнить MessageBox.Show(), но почему он вообще не выполняется?или CLR поднимает parOpts.CancellationToken.ThrowIfCancellationRequested(); наверх, чтобы неявно быть первым оператором?

Q5 - я изменил код следующим образом:

try
{
   Parallel.ForEach(files, parOpts, currentFile =>
   {
      Thread.Sleep(5000); // I pressed the cancel button on the main thread when the worker thread is sleeping
   });
}
catch (OperationCanceledException ex)
{
   MessageBox.Show("Caught");
}

, поэтому у меня достаточно времени, чтобы нажать кнопку отмены, тамэто одно "поймать" сообщение, но почему все еще есть исключение?Теперь я понимаю, что Parallel.ForEach проверяет CancellationToken.IsCancellationRequested` перед всеми дорогостоящими операциями с ресурсом. Означает ли это, что Parallel.ForEach будет проверять IsCancellationRequested после того, как все операторы внутри были выполнены?Я имею в виду Parallel.ForEach будет проверять IsCancellationRequested дважды, один перед первым оператором и один после последнего оператора?

1 Ответ

3 голосов
/ 27 июня 2019

Как Parallel.ForEach обрабатывает отмену

Ваши наблюдения верны.Но все ведет себя нормально.Поскольку свойство ParallelOptions.CancellationToken установлено, Parallel.ForEach выбрасывает OperationCanceledException, когда CancellationToken.IsCancellationRequested оценивается как true.

Все каркасные классы, которые поддерживают отмену, ведут себя так (например, Task.Run).Перед выполнением любых дорогостоящих распределений ресурсов (дорогостоящих в памяти или времени) платформа проверяет токен отмены несколько раз во время выполнения с целью повышения эффективности.Например, Parallel.ForEach должен выполнять многие из этих дорогостоящих распределений ресурсов из-за всего управления потоками.Перед каждым этапом выделения (например, инициализация, порождение рабочих потоков или разветвление, применение разделителя, вызов действия и т. Д.) CancellationToken.IsCancelRequested оценивается снова.

Последний внутренний шаг Parallel.ForEach присоединяется к потокам перед созданием ParallelLoopResult (возвращаемое значение Parallel.ForEach).Перед этой операцией CancellationToken.IsCancellationRequested снова оценивается.Поскольку вы отменили выполнение Parallel.ForEach во время выполнения Thread.Sleep(5000), вам придется ждать максимальный промежуток в 5 секунд, пока каркас не перепроверит это свойство и не сможет выбросить OperationCanceledException.Вы можете проверить это.Пройдет x / 1000 секунд Thread.Sleep(x), пока не появится MessageBox.

Другой шанс отменить Parallel.ForEach делегирован потребителю.Весьма вероятно, что действие потребителя является долгосрочным и поэтому требует отмены до того, как будет достигнут конец Parallel.ForEach.Как вы знаете, преждевременная отмена может быть вызвана (неоднократно) вызовом CancellationToken.ThrowIfCancellationRequested(), что на этот раз заставит CancellationToken бросить OperationCanceledException (а не Parallel.ForEach).

ОтветитьВаш последний вопрос, почему вы увидите только one MessageBox: в вашем особом случае вы уже заметили, что слишком медленно нажимаете кнопку отмены, пока код не достигнет CancellationToken.ThrowIfCancellationRequested(), но можете нажать еепрежде чем нить проснется от сна.Поэтому Parallel.ForEach выдает исключение (до присоединения к потокам и создания ParallelLoopResult).Итак, one исключение.Но даже если вы достаточно быстры, чтобы отменить цикл до достижения CancellationToken.ThrowIfCancellationRequested(), все равно будет отображаться только один MessageBox, так как цикл прерывает все потоки, как только выдается неперехваченное исключение.Чтобы позволить каждому потоку генерировать исключение, вы должны перехватить каждое и накапливать их, прежде чем выбросить их, завернутые в AggregateException.См. Документы Microsoft: как обрабатывать исключения в параллельных циклах для получения более подробной информации.


Изменить, чтобы ответить на следующий вопрос:

Для Q2 я только что понял, что каждый поток имеет свой собственный стек, поэтому он не будет знать, что он окружен блоком try catch, поэтому есть только одно исключение (выбрасываемое основным потоком), насколько я понимаюправильно?

Вы правы, когда говорите, что у каждого потока есть свой выделенный стек вызовов.Но когда вы пишете код, который должен выполняться одновременно, в куче для каждого потока создается копия всех локальных объектов.Это также верно для try-catch блоков.Catch указывает компилятору определить обработчик (указатель инструкции), который затем регистрируется в таблице обработчиков исключений инструкцией try.Таблица управляется ОС.Таблица исключений отображает каждый обработчик на исключение.Каждое исключение сопоставляется со стеком вызовов.Таким образом, исключения и обработчики перехвата ограничены явным стеком вызовов.Поскольку обработчик имеет доступ к локальной памяти потока, он также должен быть копией.Это означает, что каждый поток «знает» о своих catch обработчиках.

Из-за выделенных стеков вызовов и исключительного отображения исключения в стек вызовов и обработчика перехвата в исключение (и, следовательно, также в стек вызовов) любое исключение, выброшенное в области видимости потока (стек вызовов), не может быть перехвачено снаружиобъем потока (при использовании Thread).Область действия в данном случае означает адресное пространство, которое оно описывает стеком вызовов (с его кадрами вызова).Если он не попадет непосредственно в сам поток, это приведет к сбою приложения.Task (когда ожидается использование Task.Wait или await), наоборот, проглатывает все исключения и заключает их в AggregateException.

Исключение, выданное DoParallel(), не будет перехвачено:

try 
{
  Thread thread = new Thread(() => DoParallel());
  thread.Start();
}
catch (Exception ex)
{
  // Unreachable code
}

Но в следующих двух примерах оба обработчика catch вызываются для обработки исключения:

try 
{
  await Task.Run(() => DoParallel());
}
catch (AggregateException ex)
{
  // Reachable code
}

или

try 
{
  var task = new Task(() => DoParallel());
  task.Start();
  task.Wait();
}
catch (AggregateException ex)
{
  // Reachable code
}

В последних двух примерах используется параллельная библиотека задач - TPL , которая использует SynchronizationContext, чтобы позволить потокам совместно использовать контекст и, следовательно, например, распространятьисключения между потоками.Поскольку Parallel.ForEach использует Task.Wait() ( TPL ), он может перехватить исключение рабочего потока (если вы еще не перехватили его в своем действии), выполнить некоторую очистку (отменить другие рабочие потоки).и распоряжение внутренними ресурсами), а затем, наконец, распространить OperationCanceledException во внешнюю область.

Таким образом, поскольку выдается исключение,

  • ОС прерывает приложение и проверяет таблицу исключений на наличие потенциального обработчика, который был сопоставлен с этим потоком директивой try.
  • Он находит один и восстанавливает контекст для выполнения обработчика catch (в вашем случае следующий обработчик catch является внутренним обработчиком Parallel.ForEach).Приложение все еще остановлено - другие потоки все еще припаркованы.
  • Этот обработчик Parallel.ForEach выполняет очистку и завершает другие потоки до , приложение продолжается и, следовательно, до любой из рабочих потоков может сам генерировать дополнительные исключения.
  • Приложение продолжает выполнение, выполняя обработчик Parallel.ForEach catch.
  • Приложение снова останавливается, ищавнешняя область (потребительская область Parallel.ForEach) * обработчик catch.
  • Если ни один не был зарегистрирован с использованием try, приложение завершит работу с ошибкой.

    Вот почемувсегда один исключение выдается Parallel.ForEach.


Редактировать, чтобы ответить на дополнительный вопрос Q3:

теперь я могу нажать кнопку отмены, прежде чем рабочий поток достигнет ThrowIfCancellationRequested (), но я все еще получаю только одно исключение, генерируемое основным потоком.Но я нажал кнопку cancal, токен был настроен на отмену, поэтому, когда вторичный рабочий поток достигает parOpts.CancellationToken.ThrowIfCancellationRequested (); разве не должно ли оно также генерировать исключение?и это исключение не может быть обработано перехватом try в основном потоке (каждый поток имеет свой собственный стек), поэтому я должен получить необработанное исключение для остановки приложения, но это не так, я просто получаю одно исключение, генерируемое основным потокоми это исключение выдается основным или рабочим потоком

для следующего сценария:

try
{
   Parallel.ForEach(files, parOpts, currentFile =>
   {
      Thread.Sleep(5000);
      parOpts.CancellationToken.ThrowIfCancellationRequested(); 

   });
}
catch (OperationCanceledException ex)
{ 
   MessageBox.Show("Caught");
}

Поскольку в этом сценарии вы можете отменить Parallel.ForEach перед нимпо завершении, генерируется исключение в рабочем потоке (который выполняет ваш делегат действия), в момент выполнения CancellationToken.ThrowIfCancellationRequested().Под капотом метод CancellationToken.ThrowIfCancellationRequested() выглядит просто так:

public void ThrowIfCancellationRequested()
{
  if (IsCancellationRequested) 
    ThrowOperationCanceledException();
}

// Throws an OCE; separated out to enable better inlining of ThrowIfCancellationRequested
private void ThrowOperationCanceledException()
{
  throw new OperationCanceledException(Environment.GetResourceString("OperationCanceled"), this);
}

Как я уже говорил, Parallel.ForEach использует Task и Task.Wait() (_TPL_) для обработки потоков и поэтому использует SynchronizationContext.В сценарии TPL (или SynchronizationContext) контексты потоков являются общими и больше не изолируются (в отличие от потоков Thread).Это позволяет Parallel.ForEach перехватывать исключения, создаваемые дочерними потоками.

Это означает, что внутри Parallel.ForEach нет необработанных исключений, поскольку, как вы можете прочитать в пошаговом объяснении потока исключений, Parallel.ForEach внутренне перехватывает все исключения (возможно из-за TPL ) выполнить очистку и удаление выделенных ресурсов, а затем, наконец, перебросить OperationCanceledException.

При проверке стека вызовов исключения из вашего примера кода Q3 вы увидите, что источником является рабочий поток, а не «основной» поток Parallel.ForEach. Вы только что перехватили исключение в основном потоке, поскольку он содержит обработчик catch, ближайший к источнику - рабочий поток. Из-за этого основной поток может завершиться без отмены.


Parallel.ForEach и темы

Я думаю, что ваша концепция ошибочна:

... основной поток также выполняет операторы в Parallel.ForEach, не так ли? У меня есть опечатка в посте, там только две активные темы, а не три. строка [] просто имеет два элемента, поэтому первичный поток обрабатывает «первый», а один рабочий поток обрабатывает «два» ...

Это не правда. Чтобы было понятно: массив в вашем первоначальном примере содержит две строки, которые должны имитировать рабочую нагрузку, верно? Основной поток - это поток, который вы создали для выполнения цикла Parallel.ForEach с использованием Task.Factory.StartNew(() => ProcessFiles());. Это обычная практика для поддержания отзывчивости потока пользовательского интерфейса в течение длительного времени Parallel.ForEach. Следовательно, Parallel.ForEach выполняется в основном потоке, а может создать два рабочих потока - по одному для каждой загрузки (или строки). Может , потому что Parallel.ForEach на самом деле использует задач , которые поддерживаются потоков . Максимальное количество потоков ограничено числом процессоров и TaskScheduler. Из-за оптимизации производительности, выполняемой платформой, фактическое число задач не должно совпадать с количеством повторяющихся элементов или значением MaxDegreeOfParallelism.

Метод Parallel.ForEach может использовать больше задач, чем потоков в течение срока его выполнения, поскольку существующие задачи завершаются и заменяются новыми задачами. Это дает базовому объекту TaskScheduler возможность добавлять, изменять или удалять потоки, обслуживающие цикл. может решить выполнить делегаты действия в меньшем количестве потоков, чем позволяет MaxDegreeOfParallelism. (источник: Документы Microsoft: Parallel.ForEach )


Обобщать и суммировать

Предполагая, что свойство ParallelOptions.CancellationToken установлено, возможны два сценария:

Первый сценарий: вы вызвали CancellationToken.ThrowIfCancellationRequested() в своем делегате действия после запроса отмены , но до Parallel.ForEach внутренне оценивает CancellationToken.IsCancellationRequested , Теперь, если вы окружите свой код действия try-catch, то никакое исключение не покинет рабочий поток. Если такого try-catch нет, Parallel.ForEach внутренне перехватит это исключение (для некоторой очистки). Это было бы в основном потоке. Это исключение затем повторно генерируется после того, как Parallel.ForEach распорядился выделенными ресурсами. Поскольку вы вызвали CancellationToken.ThrowIfCancellationRequested() на рабочем месте, источником по-прежнему является этот рабочий поток. Помимо запроса на отмену, любое исключение может остановить выполнение Parallel.ForEach в любое время.

Второй сценарий: вы не явно вызываете CancellationToken.ThrowIfCancellationRequested() в вашем делегате действия, или произошла отмена после CancellationToken.ThrowIfCancellationRequested(), метод был вызван, затем в следующий раз Parallel.ForEach внутренне проверит CancellationToken.IsCancelRequested, исключение будет выдано Parallel.ForEach. Parallel.ForEach всегда оценивает CancellationToken.IsCancelRequested перед выделением каких-либо ресурсов. Поскольку Parallel.ForEach выполняется в первичном потоке, источником этого исключения, конечно же, будет первичный поток. Помимо запроса на отмену, любое исключение может остановить выполнение Parallel.ForEach в любое время.

Если свойство ParallelOptions.CancellationToken не установлено, то внутренние Parallel.ForEach оценки CancellationToken.IsCancelRequested не будут выполняться. В случае запроса CancellationToken.Cancel(), Parallel.ForEach не может реагировать и продолжит свою ресурсоемкую работу, , если не возникнет исключение, вызванное вызовом CancellationToken.ThrowIfCancellationRequested(). Помимо запроса на отмену, любое исключение может остановить выполнение Parallel.ForEach в любое время.

...