Как Parallel.ForEach
обрабатывает отмену
Ваши наблюдения верны.Но все ведет себя нормально.Поскольку свойство ParallelOptions.CancellationToken
установлено, Parallel.ForEach
выбрасывает OperationCanceledException
, когда CancellationToken.IsCancellationRequested
оценивается как true.
Все каркасные классы, которые поддерживают отмену, ведут себя так (например, Task.Run
).Перед выполнением любых дорогостоящих распределений ресурсов (дорогостоящих в памяти или времени) платформа проверяет токен отмены несколько раз во время выполнения с целью повышения эффективности.Например, Parallel.ForEach
должен выполнять многие из этих дорогостоящих распределений ресурсов из-за всего управления потоками.Перед каждым этапом выделения (например, инициализация, порождение рабочих потоков или разветвление, применение разделителя, вызов действия и т. Д.) CancellationToken.IsCancelRequested
оценивается снова.
Последний внутренний шаг Parallel.ForEach
присоединяется к потокам перед созданием ParallelLoopResult
(возвращаемое значение Parallel.ForEach
).Перед этой операцией CancellationToken.IsCancellationRequested
снова оценивается.Поскольку вы отменили выполнение Parallel.ForEach
во время выполнения Thread.Sleep(5000)
, вам придется ждать максимальный промежуток в 5 секунд, пока каркас не перепроверит это свойство и не сможет выбросить OperationCanceledException
.Вы можете проверить это.Пройдет x / 1000 секунд Thread.Sleep(x)
, пока не появится MessageBox
.
Другой шанс отменить Parallel.ForEach
делегирован потребителю.Весьма вероятно, что действие потребителя является долгосрочным и поэтому требует отмены до того, как будет достигнут конец Parallel.ForEach
.Как вы знаете, преждевременная отмена может быть вызвана (неоднократно) вызовом CancellationToken.ThrowIfCancellationRequested()
, что на этот раз заставит CancellationToken
бросить OperationCanceledException
(а не Parallel.ForEach
).
ОтветитьВаш последний вопрос, почему вы увидите только one MessageBox
: в вашем особом случае вы уже заметили, что слишком медленно нажимаете кнопку отмены, пока код не достигнет CancellationToken.ThrowIfCancellationRequested()
, но можете нажать еепрежде чем нить проснется от сна.Поэтому Parallel.ForEach
выдает исключение (до присоединения к потокам и создания ParallelLoopResult
).Итак, one исключение.Но даже если вы достаточно быстры, чтобы отменить цикл до достижения CancellationToken.ThrowIfCancellationRequested()
, все равно будет отображаться только один MessageBox
, так как цикл прерывает все потоки, как только выдается неперехваченное исключение.Чтобы позволить каждому потоку генерировать исключение, вы должны перехватить каждое и накапливать их, прежде чем выбросить их, завернутые в AggregateException
.См. Документы Microsoft: как обрабатывать исключения в параллельных циклах для получения более подробной информации.
Изменить, чтобы ответить на следующий вопрос:
Для Q2 я только что понял, что каждый поток имеет свой собственный стек, поэтому он не будет знать, что он окружен блоком try catch, поэтому есть только одно исключение (выбрасываемое основным потоком), насколько я понимаюправильно?
Вы правы, когда говорите, что у каждого потока есть свой выделенный стек вызовов.Но когда вы пишете код, который должен выполняться одновременно, в куче для каждого потока создается копия всех локальных объектов.Это также верно для try-catch
блоков.Catch
указывает компилятору определить обработчик (указатель инструкции), который затем регистрируется в таблице обработчиков исключений инструкцией try
.Таблица управляется ОС.Таблица исключений отображает каждый обработчик на исключение.Каждое исключение сопоставляется со стеком вызовов.Таким образом, исключения и обработчики перехвата ограничены явным стеком вызовов.Поскольку обработчик имеет доступ к локальной памяти потока, он также должен быть копией.Это означает, что каждый поток «знает» о своих catch
обработчиках.
Из-за выделенных стеков вызовов и исключительного отображения исключения в стек вызовов и обработчика перехвата в исключение (и, следовательно, также в стек вызовов) любое исключение, выброшенное в области видимости потока (стек вызовов), не может быть перехвачено снаружиобъем потока (при использовании Thread
).Область действия в данном случае означает адресное пространство, которое оно описывает стеком вызовов (с его кадрами вызова).Если он не попадет непосредственно в сам поток, это приведет к сбою приложения.Task
(когда ожидается использование Task.Wait
или await
), наоборот, проглатывает все исключения и заключает их в AggregateException
.
Исключение, выданное DoParallel()
, не будет перехвачено:
try
{
Thread thread = new Thread(() => DoParallel());
thread.Start();
}
catch (Exception ex)
{
// Unreachable code
}
Но в следующих двух примерах оба обработчика catch
вызываются для обработки исключения:
try
{
await Task.Run(() => DoParallel());
}
catch (AggregateException ex)
{
// Reachable code
}
или
try
{
var task = new Task(() => DoParallel());
task.Start();
task.Wait();
}
catch (AggregateException ex)
{
// Reachable code
}
В последних двух примерах используется параллельная библиотека задач - TPL , которая использует SynchronizationContext
, чтобы позволить потокам совместно использовать контекст и, следовательно, например, распространятьисключения между потоками.Поскольку Parallel.ForEach
использует Task.Wait()
( TPL ), он может перехватить исключение рабочего потока (если вы еще не перехватили его в своем действии), выполнить некоторую очистку (отменить другие рабочие потоки).и распоряжение внутренними ресурсами), а затем, наконец, распространить OperationCanceledException
во внешнюю область.
Таким образом, поскольку выдается исключение,
- ОС прерывает приложение и проверяет таблицу исключений на наличие потенциального обработчика, который был сопоставлен с этим потоком директивой
try
. - Он находит один и восстанавливает контекст для выполнения обработчика
catch
(в вашем случае следующий обработчик catch
является внутренним обработчиком Parallel.ForEach
).Приложение все еще остановлено - другие потоки все еще припаркованы. - Этот обработчик
Parallel.ForEach
выполняет очистку и завершает другие потоки до , приложение продолжается и, следовательно, до любой из рабочих потоков может сам генерировать дополнительные исключения. - Приложение продолжает выполнение, выполняя обработчик
Parallel.ForEach
catch
. - Приложение снова останавливается, ищавнешняя область (потребительская область
Parallel.ForEach
) * обработчик catch
. Если ни один не был зарегистрирован с использованием try
, приложение завершит работу с ошибкой.
Вот почемувсегда один исключение выдается Parallel.ForEach
.
Редактировать, чтобы ответить на дополнительный вопрос Q3:
теперь я могу нажать кнопку отмены, прежде чем рабочий поток достигнет ThrowIfCancellationRequested (), но я все еще получаю только одно исключение, генерируемое основным потоком.Но я нажал кнопку cancal, токен был настроен на отмену, поэтому, когда вторичный рабочий поток достигает parOpts.CancellationToken.ThrowIfCancellationRequested (); разве не должно ли оно также генерировать исключение?и это исключение не может быть обработано перехватом try в основном потоке (каждый поток имеет свой собственный стек), поэтому я должен получить необработанное исключение для остановки приложения, но это не так, я просто получаю одно исключение, генерируемое основным потокоми это исключение выдается основным или рабочим потоком
для следующего сценария:
try
{
Parallel.ForEach(files, parOpts, currentFile =>
{
Thread.Sleep(5000);
parOpts.CancellationToken.ThrowIfCancellationRequested();
});
}
catch (OperationCanceledException ex)
{
MessageBox.Show("Caught");
}
Поскольку в этом сценарии вы можете отменить Parallel.ForEach
перед нимпо завершении, генерируется исключение в рабочем потоке (который выполняет ваш делегат действия), в момент выполнения CancellationToken.ThrowIfCancellationRequested()
.Под капотом метод CancellationToken.ThrowIfCancellationRequested()
выглядит просто так:
public void ThrowIfCancellationRequested()
{
if (IsCancellationRequested)
ThrowOperationCanceledException();
}
// Throws an OCE; separated out to enable better inlining of ThrowIfCancellationRequested
private void ThrowOperationCanceledException()
{
throw new OperationCanceledException(Environment.GetResourceString("OperationCanceled"), this);
}
Как я уже говорил, Parallel.ForEach
использует Task
и Task.Wait() (_TPL_)
для обработки потоков и поэтому использует SynchronizationContext
.В сценарии TPL (или SynchronizationContext
) контексты потоков являются общими и больше не изолируются (в отличие от потоков Thread
).Это позволяет Parallel.ForEach
перехватывать исключения, создаваемые дочерними потоками.
Это означает, что внутри Parallel.ForEach
нет необработанных исключений, поскольку, как вы можете прочитать в пошаговом объяснении потока исключений, Parallel.ForEach
внутренне перехватывает все исключения (возможно из-за TPL ) выполнить очистку и удаление выделенных ресурсов, а затем, наконец, перебросить OperationCanceledException
.
При проверке стека вызовов исключения из вашего примера кода Q3 вы увидите, что источником является рабочий поток, а не «основной» поток Parallel.ForEach
. Вы только что перехватили исключение в основном потоке, поскольку он содержит обработчик catch
, ближайший к источнику - рабочий поток. Из-за этого основной поток может завершиться без отмены.
Parallel.ForEach и темы
Я думаю, что ваша концепция ошибочна:
... основной поток также выполняет операторы в Parallel.ForEach, не так ли? У меня есть опечатка в посте, там только две активные темы, а не три. строка [] просто имеет два элемента, поэтому первичный поток обрабатывает «первый», а один рабочий поток обрабатывает «два» ...
Это не правда. Чтобы было понятно: массив в вашем первоначальном примере содержит две строки, которые должны имитировать рабочую нагрузку, верно? Основной поток - это поток, который вы создали для выполнения цикла Parallel.ForEach
с использованием Task.Factory.StartNew(() => ProcessFiles());
. Это обычная практика для поддержания отзывчивости потока пользовательского интерфейса в течение длительного времени Parallel.ForEach
. Следовательно, Parallel.ForEach
выполняется в основном потоке, а может создать два рабочих потока - по одному для каждой загрузки (или строки). Может , потому что Parallel.ForEach
на самом деле использует задач , которые поддерживаются потоков . Максимальное количество потоков ограничено числом процессоров и TaskScheduler
. Из-за оптимизации производительности, выполняемой платформой, фактическое число задач не должно совпадать с количеством повторяющихся элементов или значением MaxDegreeOfParallelism
.
Метод Parallel.ForEach
может использовать больше задач, чем потоков в течение срока его выполнения, поскольку существующие задачи завершаются и заменяются новыми задачами. Это дает базовому объекту TaskScheduler
возможность добавлять, изменять или удалять потоки, обслуживающие цикл.
может решить выполнить делегаты действия в меньшем количестве потоков, чем позволяет MaxDegreeOfParallelism
. (источник: Документы Microsoft: Parallel.ForEach )
Обобщать и суммировать
Предполагая, что свойство ParallelOptions.CancellationToken
установлено, возможны два сценария:
Первый сценарий: вы вызвали CancellationToken.ThrowIfCancellationRequested()
в своем делегате действия после запроса отмены , но до Parallel.ForEach
внутренне оценивает CancellationToken.IsCancellationRequested
, Теперь, если вы окружите свой код действия try-catch
, то никакое исключение не покинет рабочий поток. Если такого try-catch
нет, Parallel.ForEach
внутренне перехватит это исключение (для некоторой очистки). Это было бы в основном потоке. Это исключение затем повторно генерируется после того, как Parallel.ForEach
распорядился выделенными ресурсами. Поскольку вы вызвали CancellationToken.ThrowIfCancellationRequested()
на рабочем месте, источником по-прежнему является этот рабочий поток. Помимо запроса на отмену, любое исключение может остановить выполнение Parallel.ForEach
в любое время.
Второй сценарий: вы не явно вызываете CancellationToken.ThrowIfCancellationRequested()
в вашем делегате действия, или произошла отмена после CancellationToken.ThrowIfCancellationRequested()
, метод был вызван, затем в следующий раз Parallel.ForEach
внутренне проверит CancellationToken.IsCancelRequested
, исключение будет выдано Parallel.ForEach
. Parallel.ForEach
всегда оценивает CancellationToken.IsCancelRequested
перед выделением каких-либо ресурсов. Поскольку Parallel.ForEach
выполняется в первичном потоке, источником этого исключения, конечно же, будет первичный поток. Помимо запроса на отмену, любое исключение может остановить выполнение Parallel.ForEach
в любое время.
Если свойство ParallelOptions.CancellationToken
не установлено, то внутренние Parallel.ForEach
оценки CancellationToken.IsCancelRequested
не будут выполняться. В случае запроса CancellationToken.Cancel()
, Parallel.ForEach
не может реагировать и продолжит свою ресурсоемкую работу, , если не возникнет исключение, вызванное вызовом CancellationToken.ThrowIfCancellationRequested()
. Помимо запроса на отмену, любое исключение может остановить выполнение Parallel.ForEach
в любое время.