Параллельная итерация в C #? - PullRequest
7 голосов
/ 07 февраля 2009

Есть ли способ сделать итерацию в стиле foreach над параллельными перечислимыми в C #? Я знаю, что для списков подписчиков можно использовать обычный цикл for, повторяющий int в диапазоне индексов, но я действительно предпочитаю от foreach до for по ряду причин.

Бонусные баллы, если он работает в C # 2.0

Ответы [ 6 ]

11 голосов
/ 22 ноября 2009
BlockingCollection

.NET 4 делает это довольно легко. Создайте BlockingCollection, верните его .GetConsumingEnumerable () в перечисляемом методе. Тогда foreach просто добавляет в коллекцию блокировок.

* 1003 Е.Г. *

private BlockingCollection<T> m_data = new BlockingCollection<T>();

public IEnumerable<T> GetData( IEnumerable<IEnumerable<T>> sources )
{
    Task.Factory.StartNew( () => ParallelGetData( sources ) );
    return m_data.GetConsumingEnumerable();
}

private void ParallelGetData( IEnumerable<IEnumerable<T>> sources )
{
    foreach( var source in sources )
    {
        foreach( var item in source )
        {
            m_data.Add( item );
        };
    }

    //Adding complete, the enumeration can stop now
    m_data.CompleteAdding();
}

Надеюсь, это поможет. Кстати, я опубликовал блог об этом прошлой ночью

Andre

9 голосов
/ 07 февраля 2009

Краткий ответ, нет. foreach работает только с одним перечислимым за раз.

Однако, если вы объедините свои параллельные перечислимые числа в один, вы можете foreach поверх объединенных. Я не знаю ни о каком простом встроенном способе сделать это, но должно работать следующее (хотя я не проверял это):

public IEnumerable<TSource[]> Combine<TSource>(params object[] sources)
{
    foreach(var o in sources)
    {
        // Choose your own exception
        if(!(o is IEnumerable<TSource>)) throw new Exception();
    }

    var enums =
        sources.Select(s => ((IEnumerable<TSource>)s).GetEnumerator())
        .ToArray();

    while(enums.All(e => e.MoveNext()))
    {
        yield return enums.Select(e => e.Current).ToArray();
    }
}

Тогда вы можете foreach свыше возвращаемого перечислимого:

foreach(var v in Combine(en1, en2, en3))
{
    // Remembering that v is an array of the type contained in en1,
    // en2 and en3.
}
3 голосов
/ 19 декабря 2009

Я написал реализацию EachParallel () из библиотеки .NET4 Parallel. Он совместим с .NET 3.5: Параллельный цикл ForEach в C # 3.5 Использование:

string[] names = { "cartman", "stan", "kenny", "kyle" };
names.EachParallel(name =>
{
    try
    {
        Console.WriteLine(name);
    }
    catch { /* handle exception */ }
});

Реализация:

/// <summary>
/// Enumerates through each item in a list in parallel
/// </summary>
public static void EachParallel<T>(this IEnumerable<T> list, Action<T> action)
{
    // enumerate the list so it can't change during execution
    list = list.ToArray();
    var count = list.Count();

    if (count == 0)
    {
        return;
    }
    else if (count == 1)
    {
        // if there's only one element, just execute it
        action(list.First());
    }
    else
    {
        // Launch each method in it's own thread
        const int MaxHandles = 64;
        for (var offset = 0; offset < list.Count() / MaxHandles; offset++)
        {
            // break up the list into 64-item chunks because of a limitiation             // in WaitHandle
            var chunk = list.Skip(offset * MaxHandles).Take(MaxHandles);

            // Initialize the reset events to keep track of completed threads
            var resetEvents = new ManualResetEvent[chunk.Count()];

            // spawn a thread for each item in the chunk
            int i = 0;
            foreach (var item in chunk)
            {
                resetEvents[i] = new ManualResetEvent(false);
                ThreadPool.QueueUserWorkItem(new WaitCallback((object data) =>
                {
                    int methodIndex = (int)((object[])data)[0];

                    // Execute the method and pass in the enumerated item
                    action((T)((object[])data)[1]);

                    // Tell the calling thread that we're done
                    resetEvents[methodIndex].Set();
                }), new object[] { i, item });
                i++;
            }

            // Wait for all threads to execute
            WaitHandle.WaitAll(resetEvents);
        }
    }
}
3 голосов
/ 07 февраля 2009

Ответ Zooba хорош, но вы также можете посмотреть ответы на "Как перебирать два массива одновременно" .

1 голос
/ 15 ноября 2014

Если вы хотите придерживаться основ - я переписал принятый в настоящее время ответ более простым способом:

    public static IEnumerable<TSource[]> Combine<TSource> (this IEnumerable<IEnumerable<TSource>> sources)
    {
        var enums = sources
            .Select (s => s.GetEnumerator ())
            .ToArray ();

        while (enums.All (e => e.MoveNext ())) {
            yield return enums.Select (e => e.Current).ToArray ();
        }
    }

    public static IEnumerable<TSource[]> Combine<TSource> (params IEnumerable<TSource>[] sources)
    {
        return sources.Combine ();
    }
0 голосов
/ 07 февраля 2009

Будет ли это работать для вас?

public static class Parallel
{
    public static void ForEach<T>(IEnumerable<T>[] sources,
                                  Action<T> action)
    {
        foreach (var enumerable in sources)
        {
            ThreadPool.QueueUserWorkItem(source => {
                foreach (var item in (IEnumerable<T>)source)
                    action(item);
            }, enumerable);
        }
    }
}

// sample usage:
static void Main()
{
    string[] s1 = { "1", "2", "3" };
    string[] s2 = { "4", "5", "6" };
    IEnumerable<string>[] sources = { s1, s2 };
    Parallel.ForEach(sources, s => Console.WriteLine(s));
    Thread.Sleep(0); // allow background threads to work
}

Для C # 2.0 вам необходимо преобразовать лямбда-выражения, приведенные выше, в делегаты.

Примечание. Этот служебный метод использует фоновые потоки. Возможно, вы захотите изменить его, чтобы использовать потоки переднего плана, и, возможно, вы захотите дождаться окончания всех потоков. Если вы сделаете это, я предлагаю вам создать sources.Length - 1 потоков и использовать текущий исполняющий поток для последнего (или первого) источника.

(Я хотел бы включить ожидание завершения потоков в своем коде, но мне жаль, что я пока не знаю, как это сделать. Я думаю, вам следует использовать a WaitHandle Thread.Join().)

...