Parallel.ForEach - Изящное Гашение - PullRequest
14 голосов
/ 12 января 2011

По вопросу ожидания завершения задач и синхронизации потоков.

В настоящее время у меня есть итерация, которую я вложил в Parallel.ForEach. В приведенном ниже примере в комментариях я поставил несколько вопросов о том, как лучше всего справиться с постепенным завершением цикла (.NET 4.0);

private void myFunction()
    {

        IList<string> iListOfItems = new List<string>();
        // populate iListOfItems

        CancellationTokenSource cts = new CancellationTokenSource();

        ParallelOptions po = new ParallelOptions();
        po.MaxDegreeOfParallelism = 20; // max threads
        po.CancellationToken = cts.Token;

        try
        {
            var myWcfProxy = new myWcfClientSoapClient();

            if (Parallel.ForEach(iListOfItems, po, (item, loopsate) =>
            {
                try
                {
                    if (_requestedToStop)
                        loopsate.Stop();
                    // long running blocking WS call, check before and after
                    var response = myWcfProxy.ProcessIntervalConfiguration(item);
                    if (_requestedToStop)
                        loopsate.Stop();

                    // perform some local processing of the response object
                }
                catch (Exception ex)
                {
                    // cannot continue game over.
                    if (myWcfProxy.State == CommunicationState.Faulted)
                    {
                        loopsate.Stop();
                        throw;
                    }
                }

                // else carry on..
                // raise some events and other actions that could all risk an unhanded error.

            }
            ).IsCompleted)
            {
                RaiseAllItemsCompleteEvent();
            }
        }
        catch (Exception ex)
        {
            // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the
            // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception?
            // Do I need to call cts.Cancel here?

            // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that?

            // do i need to call cts.Dispose() ?

            MessageBox.Show(Logging.FormatException(ex));
        }
        finally
        {

            if (myWcfProxy != null)
            {
            // possible race condition with the for-each threads here unless we wait for them to terminate.
                if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted)
                    myWcfProxy.Abort();

                myWcfProxy.Close();
            }

            // possible race condition with the for-each threads here unless we wait for them to terminate.
            _requestedToStop = false;

        }

    }

Любая помощь будет наиболее ценной. В документации MSDN говорится о ManualResetEventSlim и cancellationToken.WaitHandle. но не уверен, как связать их с этим, похоже, с трудом разбирается в примерах MSDN, так как большинство из них не применимы.

1 Ответ

8 голосов
/ 13 января 2011

Ниже приведен пример кода, который может ответить на ваш вопрос. Суть в том, что вы получаете параллелизм форка / соединения с Parallel.ForEach, поэтому вам не нужно беспокоиться о состоянии гонки вне параллельной задачи (вызывающий поток блокируется до тех пор, пока задачи не будут завершены, успешно или нет). Вы просто хотите убедиться, что используете переменную LoopState (второй аргумент лямбда-выражения) для управления состоянием вашего цикла.

Если любая итерация цикла вызвала необработанное исключение, общий цикл вызовет исключение AggregateException, пойманное в конце.

Другие ссылки, в которых упоминается эта тема:

Parallel.ForEach генерирует исключение при обработке очень больших наборов данных

http://msdn.microsoft.com/en-us/library/dd460720.aspx

Ограничивает ли Parallel.ForEach количество активных потоков?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.ServiceModel;

namespace Temp
{
    public class Class1
    {
        private class MockWcfProxy
        {
            internal object ProcessIntervalConfiguration(string item)
            {
                return new Object();
            }

            public CommunicationState State { get; set; }
        }

        private void myFunction()
        {

            IList<string> iListOfItems = new List<string>();
            // populate iListOfItems

            CancellationTokenSource cts = new CancellationTokenSource();

            ParallelOptions po = new ParallelOptions();
            po.MaxDegreeOfParallelism = 20; // max threads
            po.CancellationToken = cts.Token;

            try
            {
                var myWcfProxy = new MockWcfProxy();

                if (Parallel.ForEach(iListOfItems, po, (item, loopState) =>
                    {
                        try
                        {
                            if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
                                loopState.Stop();

                            // long running blocking WS call, check before and after
                            var response = myWcfProxy.ProcessIntervalConfiguration(item);

                            if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
                                loopState.Stop();

                            // perform some local processing of the response object
                        }
                        catch (Exception ex)
                        {
                            // cannot continue game over.
                            if (myWcfProxy.State == CommunicationState.Faulted)
                            {
                                loopState.Stop();
                                throw;
                            }

                            // FYI you are swallowing all other exceptions here...
                        }

                        // else carry on..
                        // raise some events and other actions that could all risk an unhanded error.
                    }
                ).IsCompleted)
                {
                    RaiseAllItemsCompleteEvent();
                }
            }
            catch (AggregateException aggEx)
            {
                // This section will be entered if any of the loops threw an unhandled exception.  
                // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here 
                // to see those (if you want).
            }
            // Execution will not get to this point until all of the iterations have completed (or one 
            // has failed, and all that were running when that failure occurred complete).
        }

        private void RaiseAllItemsCompleteEvent()
        {
            // Everything completed...
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...