Parallel.Foreach + доходность возврата? - PullRequest
23 голосов
/ 07 декабря 2011

Я хочу обработать что-то, используя параллельный цикл, например:

public void FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
    });

}

Хорошо, все работает нормально.Но как это сделать, если я хочу, чтобы метод FillLogs возвращал IEnumerable?

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
        yield return cpt // KO, don't work
    });

}

EDIT

Кажется, что это невозможно ... но я использую что-то вродеэто:

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    return computers.AsParallel().Select(cpt => cpt);
}

Но куда я положил cpt.Logs = cpt.GetRawLogs().ToList(); инструкцию

Ответы [ 3 ]

13 голосов
/ 07 декабря 2011

Короткая версия - нет, это невозможно через блок итератора; более длинная версия, вероятно, включает синхронизированную очередь / очередь между потоком итератора вызывающей стороны (выполнение очереди) и параллельными рабочими (выполнение очереди); но, как примечание: журналы обычно связаны с IO, а распараллеливание вещей, связанных с IO, часто не очень хорошо работает.

Если вызывающему абоненту потребуется некоторое время, чтобы потреблять каждый, тогда может быть некоторая заслуга в подходе, который обрабатывает только один журнал за раз, но может сделать это при вызывающий абонент использует предыдущий журнал; то есть он начинается a Task для следующего элемента до yield и ожидает завершения после yield ... но это опять-таки довольно сложно. В качестве упрощенного примера:

static void Main()
{
    foreach(string s in Get())
    {
        Console.WriteLine(s);
    }
}

static IEnumerable<string> Get() {
    var source = new[] {1, 2, 3, 4, 5};
    Task<string> outstandingItem = null;
    Func<object, string> transform = x => ProcessItem((int) x);
    foreach(var item in source)
    {
        var tmp = outstandingItem;

        // note: passed in as "state", not captured, so not a foreach/capture bug
        outstandingItem = new Task<string>(transform, item);
        outstandingItem.Start();

        if (tmp != null) yield return tmp.Result;
    }
    if (outstandingItem != null) yield return outstandingItem.Result;
}
static string ProcessItem(int i)
{
    return i.ToString();
}
3 голосов
/ 07 декабря 2011

Я не хочу быть оскорбительным, но, возможно, не хватает понимания.Parallel.ForEach означает, что TPL будет запускать foreach в соответствии с доступным оборудованием в нескольких потоках.Но это означает, что ii можно выполнять эту работу параллельно!yield return дает вам возможность вывести некоторые значения из списка (или что-то еще) и вернуть их один за другим, когда они необходимы.Это предотвращает необходимость сначала найти все элементы, соответствующие условию, а затем выполнить итерацию по ним.Это действительно преимущество в производительности, но не может быть сделано параллельно.

0 голосов
/ 10 августа 2017

Как насчет

            Queue<string> qu = new Queue<string>();
            bool finished = false;
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(get_list(), (item) =>
                {
                    string itemToReturn = heavyWorkOnItem(item);         
                    lock (qu)
                       qu.Enqueue(itemToReturn );                        
                });
                finished = true;
            });

            while (!finished)
            {
                lock (qu)
                    while (qu.Count > 0)
                        yield return qu.Dequeue();
                //maybe a thread sleep here?
            }

Edit: Я думаю, что это лучше:

        public static IEnumerable<TOutput> ParallelYieldReturn<TSource, TOutput>(this IEnumerable<TSource> source, Func<TSource, TOutput> func)
        {
            ConcurrentQueue<TOutput> qu = new ConcurrentQueue<TOutput>();
            bool finished = false;
            AutoResetEvent re = new AutoResetEvent(false);
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(source, (item) =>
                {
                    qu.Enqueue(func(item));
                    re.Set();
                });
                finished = true;
                re.Set();
            });

            while (!finished)
            {
                re.WaitOne();
                while (qu.Count > 0)
                {
                    TOutput res;
                    if (qu.TryDequeue(out res))
                        yield return res;
                }
            }
        }   

Edit2: Я согласен с коротким Нет ответом. Этот код бесполезен; Вы не можете разорвать цикл доходности.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...