Одновременная обработка только n элементов с использованием Task Parallel Library - PullRequest
8 голосов
/ 04 августа 2011

Все это происходит в службе Windows.

У меня есть Queue<T> (на самом деле ConcurrentQueue<T>) предметы, ожидающие обработки. Но я не хочу обрабатывать только по одному за раз, я хочу обрабатывать n элементов одновременно, где n - это настраиваемое целое число.

Как мне сделать это с помощью параллельной библиотеки задач?

Я знаю, что TPL будет разделять коллекции от имени разработчика для одновременной обработки, но не уверен, что это функция, которая мне нужна. Я новичок в многопоточности и TPL.

Ответы [ 3 ]

4 голосов
/ 04 августа 2011

Вот одна идея, которая предполагает создание метода расширения для TaskFactory.

public static class TaskFactoryExtension
{
    public static Task StartNew(this TaskFactory target, Action action, int parallelism)
    {
        var tasks = new Task[parallelism];
        for (int i = 0; i < parallelism; i++)
        {
            tasks[i] = target.StartNew(action);
        }
        return target.StartNew(() => Task.WaitAll(tasks));
    }
}

Тогда ваш код вызова будет выглядеть следующим образом.

ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
var task = Task.Factory.StartNew(
  () =>
  {
    T item;
    while (queue.TryDequeue(out item))
    {
      ProcessItem(item);
    }
  }, n);
task.Wait(); // Optionally wait for everything to finish.

Вот еще одна идея, использующая Parallel.ForEach. Проблема с этим подходом состоит в том, что ваши степени параллелизма не обязательно должны соблюдаться. Вы указываете только максимально допустимую сумму, а не абсолютную сумму.

ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n },
  (item) =>
  {
    ProcessItem(item);    
  });
4 голосов
/ 04 августа 2011

Используйте BlockingCollection<T> вместо ConcurrentQueue<T>, тогда вы можете запустить любое количество пользовательских потоков и использовать Take метод BlockingCollection.если коллекция пуста, метод Take автоматически заблокирует в потоке вызывающей стороны ожидание добавления элементов, в противном случае потоки будут использовать все элементы очереди параллельно.Однако, как ваш вопрос упомянул об использовании TPL, выясняется, что у Parallel.ForEach есть некоторые проблемы при использовании с BlockingCollection check этой публикацией для получения более подробной информации.так что вы должны сами управлять созданием своих потребительских потоков.new Thread(/*consumer method*/) или new Task() ...

1 голос
/ 16 августа 2012

Я бы также рекомендовал использовать BlockingCollection вместо непосредственного использования ConcurrentQueue.

Вот пример:

public class QueuingRequestProcessor
{
  private BlockingCollection<MyRequestType> queue;

  public void QueuingRequestProcessor(int maxConcurrent)
  {
    this.queue = new BlockingCollection<MyRequestType>(maxConcurrent);

    Task[] consumers = new Task[maxConcurrent];

    for (int i = 0; i < maxConcurrent; i++)
    {
      consumers[i] = Task.Factory.StartNew(() =>
      {
        // Will wait when queue is empty, until CompleteAdding() is called
        foreach (var request in this.queue.GetConsumingEnumerable())
        {
          Process(request);
        }
      });
    }
  }

  public void Add(MyRequest request)
  {
    this.queue.Add(request);
  }

  public void Stop()
  {
    this.queue.CompleteAdding();
  }

  private void Process(MyRequestType request)
  {
    // Do your processing here
  }
}

Обратите внимание, что maxConcurrent в конструкторе определяетсколько запросов будет обработано одновременно.

...