Расширения Rx: где находится Parallel.ForEach? - PullRequest
8 голосов
/ 19 декабря 2011

У меня есть фрагмент кода, который использует Parallel.ForEach, вероятно, основанный на старой версии расширений Rx или параллельной библиотеке задач. Я установил текущую версию расширений Rx, но не могу найти Parallel.ForEach. Я не использую какие-либо другие необычные вещи из библиотеки и просто хочу обрабатывать некоторые данные параллельно, как это:

Parallel.ForEach(records, ProcessRecord);

Я нашел этот вопрос , но я бы не хотел зависеть от старых версий Rx. Но я не смог найти что-то подобное для Rx, так каков текущий и самый прямой способ сделать это с использованием текущей версии Rx? В проекте используется .NET 3.5.

Ответы [ 2 ]

27 голосов
/ 19 декабря 2011

Нет необходимости делать все эти глупые гусиные вещи, если у вас есть Rx:

records.ToObservable()
    .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .ToList()
    .First();

(или, если вы хотите, чтобы порядок предметов сохранялся за счет эффективности):

records.ToObservable()
    .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .Concat()
    .ToList()
    .First();

Или, если вы хотите ограничить количество предметов одновременно:

records.ToObservable()
    .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)))
    .Merge(5 /* at a time */)
    .ToList()
    .First();
1 голос
/ 19 декабря 2011

Вот простая замена:

class Parallel
{
    public static void ForEach<T>(IEnumerable<T> source, Action<T> body)
    {
        if (source == null)
        {
            throw new ArgumentNullException("source");
        }
        if (body == null)
        {
            throw new ArgumentNullException("body");
        }
        var items = new List<T>(source);
        var countdown = new CountdownEvent(items.Count);
        WaitCallback callback = state =>
        {
            try
            {
                body((T)state);
            }
            finally
            {
                countdown.Signal();
            }
        };
        foreach (var item in items)
        {
            ThreadPool.QueueUserWorkItem(callback, item);
        }
        countdown.Wait();
    }
}
...