Как указывает Рид, BlockingCollection - это хороший "ручной" способ перехода сюда. Недостатком является то, что вы должны сами управлять потребителями.
Еще один подход, который вы, возможно, захотите рассмотреть, который берет на себя большую часть работы по координации для подобных сценариев, заключается в изучении TPL Dataflow . В частности, в подобном сценарии вы можете просто использовать ActionBlock<T>
, а когда сообщение поступит из очереди, вы просто Post
получите новый фрагмент данных для ActionBlock<T>
, и он автоматически обработает его, используя рабочие потоки TPL под крышками. Это сделало бы ваш Engine
класс похожим на это:
ActionBlock<int> myActionBlock = new ActionBlock<int>(this.ProcessWorkItem);
void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
int input = (int)e.Message.Body;
// Post the data to the action block
this.myActionBlock.Post(input);
}
private void ProcessWorkItem(int workItemData)
{
// ActionBlock will hand each work item to this method for processing
}
Теперь, что касается управления параллелизмом или емкостью, вы можете легко управлять этими особенностями ActionBlock<T>
, передавая ExecutionDataflowBlockOptions
при построении ActionBlock<T>
. Допустим, я хочу убедиться, что у меня никогда не будет параллелизма больше четырех, и запретить производителю добавлять в очередь более ста элементов. Я бы просто сделал:
ActionBlock<int> myActionBlock = new ActionBlock<int>(
this.ProcessWorkItem,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
BoundedCapacity = 100
});