Почему Parallel.ForEach не завершит выполнение серии задач, пока MoveNext не вернет false?
У меня есть инструмент, который отслеживает комбинацию очередей MSMQ и Service Broker для входящих сообщений. Когда сообщение найдено, оно передает это сообщение соответствующему исполнителю.
Я завернул проверку сообщений в IEnumerable, чтобы я мог передать в метод Parallel.ForEach IEnumerable плюс делегат для запуска. Приложение предназначено для непрерывной работы с обработкой IEnumerator.MoveNext в цикле до тех пор, пока оно не сможет приступить к работе, а затем IEnumerator.Current, предоставив ему следующий элемент.
Поскольку MoveNext никогда не умрет, пока я не установлю CancelToken в значение true, это должно продолжаться вечно. Вместо этого я вижу, что как только Parallel.ForEach собрал все сообщения и MoveNext больше не возвращает «true», больше не выполняются никакие задачи. Вместо этого кажется, что поток MoveNext является единственным потоком, выполняющим какую-либо работу, пока он ожидает его возврата, а другие потоки (включая ожидающие и запланированные потоки) не выполняют никакой работы.
- Есть ли способ заставить Parallel продолжать работать, пока он ожидает ответа от MoveNext?
- Если нет, есть ли другой способ структурировать MoveNext, чтобы получить то, что я хочу? (когда он возвращает true, а затем Current возвращает нулевой объект, порождает много поддельных задач)
- Бонусный вопрос: есть ли способ ограничить количество сообщений, которые Parallel выдает одновременно? Кажется, что он выполняет и планирует много сообщений одновременно (MaxDegreeOfParallelism, кажется, только ограничивает объем работы, выполняемой за один раз, но не мешает запускать большое количество сообщений для планирования)
Вот IEnumerator для того, что я написал (без какого-либо постороннего кода):
public class DataAccessEnumerator : IEnumerator<TransportMessage>
{
public TransportMessage Current
{ get { return _currentMessage; } }
public bool MoveNext()
{
while (_cancelToken.IsCancellationRequested == false)
{
TransportMessage current;
foreach (var task in _tasks)
{
if (task.QueueType.ToUpper() == "MSMQ")
current = _msmq.Get(task.Name);
else
current = _serviceBroker.Get(task.Name);
if (current != null)
{
_currentMessage = current;
return true;
}
}
WaitHandle.WaitAny(new [] {_cancelToken.WaitHandle}, 500);
}
return false;
}
public DataAccessEnumerator(IDataAccess<TransportMessage> serviceBroker, IDataAccess<TransportMessage> msmq, IList<JobTask> tasks, CancellationToken cancelToken)
{
_serviceBroker = serviceBroker;
_msmq = msmq;
_tasks = tasks;
_cancelToken = cancelToken;
}
private readonly IDataAccess<TransportMessage> _serviceBroker;
private readonly IDataAccess<TransportMessage> _msmq;
private readonly IList<JobTask> _tasks;
private readonly CancellationToken _cancelToken;
private TransportMessage _currentMessage;
}
Вот вызов Parallel.ForEach, где _queueAccess - это IEnumerable, который содержит вышеупомянутый IEnumerator, а RunJob обрабатывает сообщение TransportMessage, которое возвращается из этого IEnumerator:
var parallelOptions = new ParallelOptions
{
CancellationToken = _cancelTokenSource.Token,
MaxDegreeOfParallelism = 8
};
Parallel.ForEach(_queueAccess, parallelOptions, x => RunJob(x));