Как использовать IObservable / IObserver с ConcurrentQueue или ConcurrentStack - PullRequest
5 голосов
/ 13 июня 2010

Я понял, что когда я пытаюсь обрабатывать элементы в параллельной очереди, используя несколько потоков, в то время как несколько потоков могут помещать элементы в нее, идеальным решением было бы использование Reactive Extensions с параллельными структурами данных.

Мой оригинальный вопрос по адресу:

При использовании ConcurrentQueue при попытке параллельного запуска очереди из очереди

Поэтому мне любопытно, есть ли способ получитьLINQ (или PLINQ) запрос, который будет непрерывно отключаться по мере поступления в него элементов.

Я пытаюсь заставить это работать так, чтобы я мог иметь n производителей, выдвигающих в очередь, и ограниченныйколичество потоков для обработки, поэтому я не перегружаю базу данных.

Если бы я мог использовать инфраструктуру Rx, то я ожидаю, что я мог бы просто запустить ее, и если 100 элементов будут помещены в пределах 100 мс, то 20потоки, которые являются частью запроса PLINQ, будут просто обрабатываться через очередь.

Есть три технологии, которые я пытаюсьg для совместной работы:

  1. Rx Framework (Reactive LINQ)
  2. PLING
  3. System.Collections.Concurrent структуры

Ответы [ 2 ]

6 голосов
/ 06 января 2011

Дрю прав, я думаю, что ConcurrentQueue, хотя он звучит идеально для работы, на самом деле является базовой структурой данных, которую использует BlockingCollection. Кажется, мне тоже очень далеко. Проверьте главу 7 этой книги * http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 и это объяснит, как использовать BlockingCollection и иметь несколько производителей и нескольких потребителей, каждый из которых снимает «очередь». Вы захотите взглянуть на метод «GetConsumingEnumerable ()» и, возможно, просто вызовите .ToObservable () для этого.

* остальная часть книги довольно средняя.

редактирование:

Вот пример программы, которая, я думаю, делает то, что вы хотите?

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}
3 голосов
/ 01 декабря 2010

Я не знаю, как лучше всего добиться этого с помощью Rx, но я бы порекомендовал просто использовать BlockingCollection<T> и модель производитель-потребитель .Ваш основной поток добавляет элементы в коллекцию, которая по умолчанию использует ConcurrentQueue<T> внизу.Затем у вас есть отдельное Task, которое вы ускоряете перед тем, которое использует Parallel::ForEach вместо BlockingCollection<T> для обработки столько элементов из коллекции, сколько имеет смысл для системы одновременно.Теперь вы, вероятно, также захотите изучить использование GetConsumingPartitioner метода библиотеки ParallelExtensions, чтобы быть наиболее эффективным, поскольку разделитель по умолчанию создаст больше служебных данных, чем вы хотите в этом случае.Вы можете прочитать больше об этом из этого сообщения в блоге .

Когда основная тема закончена, вы звоните CompleteAdding на BlockingCollection<T> и Task::Wait на Task, который вы развернули, чтобы дождаться, пока все потребители завершат обработку всех элементов в коллекции.

...