Ниже приведен пример кода, который может ответить на ваш вопрос. Суть в том, что вы получаете параллелизм форка / соединения с Parallel.ForEach, поэтому вам не нужно беспокоиться о состоянии гонки вне параллельной задачи (вызывающий поток блокируется до тех пор, пока задачи не будут завершены, успешно или нет). Вы просто хотите убедиться, что используете переменную LoopState (второй аргумент лямбда-выражения) для управления состоянием вашего цикла.
Если любая итерация цикла вызвала необработанное исключение, общий цикл вызовет исключение AggregateException, пойманное в конце.
Другие ссылки, в которых упоминается эта тема:
Parallel.ForEach генерирует исключение при обработке очень больших наборов данных
http://msdn.microsoft.com/en-us/library/dd460720.aspx
Ограничивает ли Parallel.ForEach количество активных потоков?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.ServiceModel;
namespace Temp
{
public class Class1
{
private class MockWcfProxy
{
internal object ProcessIntervalConfiguration(string item)
{
return new Object();
}
public CommunicationState State { get; set; }
}
private void myFunction()
{
IList<string> iListOfItems = new List<string>();
// populate iListOfItems
CancellationTokenSource cts = new CancellationTokenSource();
ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = 20; // max threads
po.CancellationToken = cts.Token;
try
{
var myWcfProxy = new MockWcfProxy();
if (Parallel.ForEach(iListOfItems, po, (item, loopState) =>
{
try
{
if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
loopState.Stop();
// long running blocking WS call, check before and after
var response = myWcfProxy.ProcessIntervalConfiguration(item);
if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
loopState.Stop();
// perform some local processing of the response object
}
catch (Exception ex)
{
// cannot continue game over.
if (myWcfProxy.State == CommunicationState.Faulted)
{
loopState.Stop();
throw;
}
// FYI you are swallowing all other exceptions here...
}
// else carry on..
// raise some events and other actions that could all risk an unhanded error.
}
).IsCompleted)
{
RaiseAllItemsCompleteEvent();
}
}
catch (AggregateException aggEx)
{
// This section will be entered if any of the loops threw an unhandled exception.
// Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here
// to see those (if you want).
}
// Execution will not get to this point until all of the iterations have completed (or one
// has failed, and all that were running when that failure occurred complete).
}
private void RaiseAllItemsCompleteEvent()
{
// Everything completed...
}
}
}