Parallel.ForEach не в состоянии выполнить сообщения на долго работающем IEnumerable - PullRequest
0 голосов
/ 19 октября 2011

Почему Parallel.ForEach не завершит выполнение серии задач, пока MoveNext не вернет false?

У меня есть инструмент, который отслеживает комбинацию очередей MSMQ и Service Broker для входящих сообщений. Когда сообщение найдено, оно передает это сообщение соответствующему исполнителю.

Я завернул проверку сообщений в IEnumerable, чтобы я мог передать в метод Parallel.ForEach IEnumerable плюс делегат для запуска. Приложение предназначено для непрерывной работы с обработкой IEnumerator.MoveNext в цикле до тех пор, пока оно не сможет приступить к работе, а затем IEnumerator.Current, предоставив ему следующий элемент.

Поскольку MoveNext никогда не умрет, пока я не установлю CancelToken в значение true, это должно продолжаться вечно. Вместо этого я вижу, что как только Parallel.ForEach собрал все сообщения и MoveNext больше не возвращает «true», больше не выполняются никакие задачи. Вместо этого кажется, что поток MoveNext является единственным потоком, выполняющим какую-либо работу, пока он ожидает его возврата, а другие потоки (включая ожидающие и запланированные потоки) не выполняют никакой работы.

  • Есть ли способ заставить Parallel продолжать работать, пока он ожидает ответа от MoveNext?
  • Если нет, есть ли другой способ структурировать MoveNext, чтобы получить то, что я хочу? (когда он возвращает true, а затем Current возвращает нулевой объект, порождает много поддельных задач)
  • Бонусный вопрос: есть ли способ ограничить количество сообщений, которые Parallel выдает одновременно? Кажется, что он выполняет и планирует много сообщений одновременно (MaxDegreeOfParallelism, кажется, только ограничивает объем работы, выполняемой за один раз, но не мешает запускать большое количество сообщений для планирования)

Вот IEnumerator для того, что я написал (без какого-либо постороннего кода):

public class DataAccessEnumerator : IEnumerator<TransportMessage> 
{
    public TransportMessage Current
    {   get { return _currentMessage; } }

    public bool MoveNext()
    {
        while (_cancelToken.IsCancellationRequested == false)
        {
            TransportMessage current;
            foreach (var task in _tasks)
            {
                if (task.QueueType.ToUpper() == "MSMQ")
                    current = _msmq.Get(task.Name);
                else
                    current = _serviceBroker.Get(task.Name);

                if (current != null)
                {
                    _currentMessage = current;
                    return true;
                }
            }
            WaitHandle.WaitAny(new [] {_cancelToken.WaitHandle}, 500); 
        }

        return false; 
    }

    public DataAccessEnumerator(IDataAccess<TransportMessage> serviceBroker, IDataAccess<TransportMessage> msmq, IList<JobTask> tasks, CancellationToken cancelToken)
    {
        _serviceBroker = serviceBroker;
        _msmq = msmq;
        _tasks = tasks;
        _cancelToken = cancelToken;
    }

    private readonly IDataAccess<TransportMessage> _serviceBroker;
    private readonly IDataAccess<TransportMessage> _msmq;
    private readonly IList<JobTask> _tasks;
    private readonly CancellationToken _cancelToken;
    private TransportMessage _currentMessage;
}

Вот вызов Parallel.ForEach, где _queueAccess - это IEnumerable, который содержит вышеупомянутый IEnumerator, а RunJob обрабатывает сообщение TransportMessage, которое возвращается из этого IEnumerator:

var parallelOptions = new ParallelOptions
    {
        CancellationToken = _cancelTokenSource.Token,
        MaxDegreeOfParallelism = 8 
    };

Parallel.ForEach(_queueAccess, parallelOptions, x => RunJob(x));

Ответы [ 3 ]

3 голосов
/ 19 октября 2011

Мне кажется, что Parallel.ForEach не совсем подходит для того, что вы хотите сделать.Вместо этого я предлагаю использовать BlockingCollection<T> для создания очереди производителя / потребителя - создайте группу потоков / задач для обслуживания блокирующей коллекции и добавьте в нее рабочие элементы по мере их поступления.

1 голос
/ 20 октября 2011

Ваша проблема может быть связана с используемым Partitioner.

В вашем случае TPL выберет Chunk Partitioner, который возьмет несколько элементов из перечисления, прежде чем передать их для обработки.Количество элементов, взятых в каждом чанке, будет со временем увеличиваться.

Когда ваш метод MoveNext блокируется, TPL остается в ожидании следующего элемента и не будет обрабатывать элементы, которые он уже взял.

У вас есть несколько вариантов, чтобы это исправить:

1) Написать Partitioner, который всегда возвращает отдельные элементы.Не так сложно, как кажется.

2) Используйте TPL вместо Parallel.ForEach:

foreach ( var item in _queueAccess )
{
    var capturedItem = item;

    Task.Factory.StartNew( () => RunJob( capturedItem ) );
}

Второе решение немного меняет поведение.Цикл foreach завершится, когда будут созданы все Tasks, а не когда они завершатся.Если это проблема для вас, вы можете добавить CountdownEvent:

var ce = new CountdownEvent( 1 );

foreach ( var item in _queueAccess )
{
    ce.AddCount();

    var capturedItem = item;

    Task.Factory.StartNew( () => { RunJob( capturedItem ); ce.Signal(); } );
}

ce.Signal();
ce.Wait();
0 голосов
/ 19 октября 2011

Я не приложил усилий, чтобы убедиться в этом, но впечатление, которое я получил от обсуждений Parallel.ForEach, заключалось в том, что он вытянет все элементы из перечисляемого ими и примет соответствующие решения о том, как разделить их по всем потокам. Исходя из вашей проблемы, это кажется правильным.

Итак, чтобы сохранить большую часть вашего текущего кода, вы, вероятно, должны извлечь блокирующий код из итератора и поместить его в цикл вокруг вызова Parallel.ForEach (который использует итератор).

...