Как я могу создать постоянную обработку "Поток", используя TPL в C # 4 - PullRequest
5 голосов
/ 10 февраля 2012

Я не уверен, возможно ли следующее, но я бы хотел вызвать несколько действий в Paralell в удушающем режиме, но сохранить непрерывность потока обработки, не возвращаясь к использованию таймеров или цикла /циклы сна.

До сих пор у меня получалось, что он загружает большую партию входных данных из какого-то источника ... и затем обрабатывает их параллельным образом и циклически повторяет, как показано ниже.

я хотел бы знать следующее: есть ли в TPL конструкция, которую я мог бы использовать, чтобы она всегда работала ... чтобы я мог заменить "Timer Elapsing" и "Database Polling" на MessageQueue Receededevent.

Ниже приведена грубая версия того, что я хотел бы достичь ... есть и другие способы, которыми я могу воспользоваться, но я хочу знать, является ли этот тип Pattern встроенным в TPL.

internal class Engine
{
    private MessageQueue mq;
    private Queue<int> myInternalApplicationQueue;

    public Engine()
    {
        //Message Queue to get new task inputs from
        mq = new MessageQueue();
        mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);

        // internal Queue to put them in.
        myInternalApplicationQueue = new Queue<int>();
    }

    void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        //On MQ Receive, pop the input in a queue in my app
        int input = (int) e.Message.Body;

        myInternalApplicationQueue.Enqueue(input);
    }

    public void StartWorking()
    {
        //Once this gets called, it doesn't stop... it just keeps processing/watching that queue
        //processing the tasks as fast as it's allowed while the app is running.
        var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 };
        Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
        //       ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
    }

}

Ответы [ 2 ]

5 голосов
/ 10 февраля 2012

Вы можете использовать BlockingCollection<T> для обработки этого типа операции, которая фактически является сценарием производителя / потребителя.

По сути, вы настроите BlockingCollection<T> и будете использовать его в качестве «производителя». Затем у вас будет три (или любое число) из потребительских задач (которые часто задаются как долго выполняющиеся задачи), которые обрабатывают элементы (вызывая blockingCollection.GetConsumingEnumerable() в стандартном цикле foreach).

Затем вы добавляете предметы по мере необходимости в коллекцию, и они будут постоянно обрабатываться. Когда вы закончите, вы позвоните BlockingCollection<T>.CompleteAdding, что приведет к завершению циклов foreach и остановке всего процесса.

В качестве дополнительного примечания - обычно вы не хотите использовать Parallel.ForEach на GetConsumingEnumerable() из BlockingCollection<T> - по крайней мере, если вы сами не разберетесь с разбиением. Как правило, лучше использовать несколько задач и выполнять каждую из них последовательно. Причина в том, что схема секционирования по умолчанию в Parallel.ForEach вызовет проблемы (она ожидает, пока не будет доступен «кусок» данных, вместо немедленной обработки элементов, и «чанки» со временем становятся все больше и больше).

2 голосов
/ 12 февраля 2012

Как указывает Рид, BlockingCollection - это хороший "ручной" способ перехода сюда. Недостатком является то, что вы должны сами управлять потребителями.

Еще один подход, который вы, возможно, захотите рассмотреть, который берет на себя большую часть работы по координации для подобных сценариев, заключается в изучении TPL Dataflow . В частности, в подобном сценарии вы можете просто использовать ActionBlock<T>, а когда сообщение поступит из очереди, вы просто Post получите новый фрагмент данных для ActionBlock<T>, и он автоматически обработает его, используя рабочие потоки TPL под крышками. Это сделало бы ваш Engine класс похожим на это:

ActionBlock<int> myActionBlock = new ActionBlock<int>(this.ProcessWorkItem);

void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)      
{      
    int input = (int)e.Message.Body;

    // Post the data to the action block
    this.myActionBlock.Post(input);
}

private void ProcessWorkItem(int workItemData)
{
    // ActionBlock will hand each work item to this method for processing
}

Теперь, что касается управления параллелизмом или емкостью, вы можете легко управлять этими особенностями ActionBlock<T>, передавая ExecutionDataflowBlockOptions при построении ActionBlock<T>. Допустим, я хочу убедиться, что у меня никогда не будет параллелизма больше четырех, и запретить производителю добавлять в очередь более ста элементов. Я бы просто сделал:

ActionBlock<int> myActionBlock = new ActionBlock<int>(
                                     this.ProcessWorkItem, 
                                     new ExecutionDataflowBlockOptions
                                     {
                                         MaxDegreeOfParallelism = 4,
                                         BoundedCapacity = 100
                                     });
...