Как сбалансировать и распараллелить обработку нескольких последовательных потоков данных - PullRequest
3 голосов
/ 06 марта 2012

У меня есть один поток данных, который должен быть обработан как можно быстрее. Единый поток содержит данные до 200 источников. Не все источники выдают одинаковый объем данных, и скорость может отличаться.

В качестве начальной попытки я решил создать 10 (в зависимости от спецификации сервера, двухъядерный процессор), долго выполняющиеся задачи. Каждая задача будет читать из BlockCollection. Прежде чем начать, я создал карту, чтобы по мере получения данных во входящем потоке я знал, к какой BlockingCollection добавить эти источники данных.

Проблема, я думаю, заключается в том, что я не знаю заранее, какой источник будет производить наибольшее количество данных, и, действительно, это может измениться со временем. Я видел, что некоторые коллекции были очень пустыми, в то время как другие получали гораздо больше обновлений.

Если у меня доступно 8 аппаратных потоков, и я создал около 10 очередей, а задачи не связаны с потоком (опять же, не уверен, верно ли это для TaskCreationOptions.LongRunning), то даже если одна очередь не занята, другая занятая очередь не может использовать резервный поток, так как в теории я мог бы обработать часть данных вне последовательности.

Было бы лучше, если бы я просто создавал коллекцию задач и блокировок для каждого источника, тогда TPL сможет наилучшим образом использовать доступные потоки, поскольку данные максимально разделены?

Моя другая альтернатива состояла в том, чтобы как-то потренироваться на прошлой статистике и различной внешней / человеческой информации, как наилучшим образом распределить источники среди конечного набора BlockingCollections / Tasks, а затем скорректировать сопоставление во времени.

Надеюсь, я достаточно хорошо объяснил свой сценарий.

Я использую класс, который инкапсулирует BlockingCollection и Task

У меня есть то, что можно визуализировать как чередование более 40 потоков, которые в случае разделения обрабатываются одновременно (если каждый поток хранится в своей собственной последовательности), но потоков намного больше, чем доступных аппаратных потоков.

РЕДАКТИРОВАТЬ - Попытка уточнить мой запрос

Чтобы попытаться уточнить, что я ищу. В настоящее время я эффективно делю источники на подгруппы и выделяю каждой группе свою очередь. Мой вопрос действительно: сколько групп создать? Если у меня есть 200 источников, я должен создать 200 групп (то есть 200 заданий и коллекций блокировок), а затем позволить TPL бегать, как сумасшедший, распределяя потоки, где это возможно, так как каждая задача получает свое время процессора. Или мне лучше выделить 1 группу для каждого аппаратного потока?

Ответы [ 3 ]

1 голос
/ 08 марта 2012

Я бы лично использовал поток данных TPL здесь и просто определил бы ActionBlock<T>, который представляет вашу работу и связал бы BufferBlock<T> "перед" ней, чтобы предотвратить перенасыщение различными производителями.Затем все, что вы делаете, это публикуете на BufferBlock<T> из ваших различных источников (производителей) и убедитесь, что вы загрузили и протестировали / сконфигурировали параметры вашего блока (BoundedCapacity, MaxDegreeOfParallelism, MaxMessagesPerTask и т. Д.) Соответственно и позволилиTPL Dataflow работает своим волшебством.Снимает всю тяжесть с ваших рук.

0 голосов
/ 06 марта 2012

Если вы создали одну Task и одну очередь для каждого источника, это должно работать, пока этот Task завершается, когда очередь пуста. Вы наверняка будете знать, что порядок элементов в одном источнике поддерживается, и вы должны иметь полную загрузку ЦП, когда это необходимо. Но если все потоки в настоящее время обрабатываются, а новые данные поступают из низкочастотного источника, вы можете долго ждать, пока эти данные будут обработаны.

Если это проблема для вас, вы должны сами управлять точным порядком обработки и не полагаться на Task s. Для этого у вас может быть одна глобальная очередь, а затем локальная очередь для каждого источника. Для поддержания порядка может быть не более одного элемента данных в глобальной очереди или обрабатываемой в данный момент. Когда обработка элемента завершена, элемент перемещается из правильной локальной очереди в глобальную очередь, если это возможно. Таким образом, вы должны получить более справедливый порядок обработки данных.

Код может выглядеть так:

class SourcesManager<T>
{
    private readonly BlockingCollection<Tuple<T, Source<T>>> m_queue =
        new BlockingCollection<Tuple<T, Source<T>>>();

    public Source<T> CreateSource()
    {
        return new Source<T>(m_queue);
    }

    // blocks if no items are available and Complete() hasn't been called
    public bool TryProcess(Action<T> action)
    {
        Tuple<T, Source<T>> tuple;
        if (m_queue.TryTake(out tuple, Timeout.Infinite))
        {
            action(tuple.Item1);
            tuple.Item2.TryDequeue();
            return true;
        }

        return false;
    }

    public void Complete()
    {
        m_queue.CompleteAdding();
    }
}

class Source<T>
{
    private readonly Queue<T> m_localQueue = new Queue<T>();
    private readonly BlockingCollection<Tuple<T, Source<T>>> m_managerQueue;
    private volatile bool m_managerHasData = false;

    internal Source(BlockingCollection<Tuple<T, Source<T>>> managerQueue)
    {
        m_managerQueue = managerQueue;
    }

    public void Enqueue(T data)
    {
        lock (m_localQueue)
        {
            if (!m_managerHasData)
            {
                m_managerQueue.Add(Tuple.Create(data, this));
                m_managerHasData = true;
            }
            else
                m_localQueue.Enqueue(data);
        }
    }

    internal bool TryDequeue()
    {
        lock (m_localQueue)
        {
            if (m_localQueue.Count == 0)
            {
                m_managerHasData = false;
                return false;
            }

            m_managerQueue.Add(Tuple.Create(m_localQueue.Dequeue(), this));
            return true;
        }
    }
}
0 голосов
/ 06 марта 2012

Я верю, что трубопроводный подход поможет вам.

Шаблон конвейера использует параллельные задачи и параллельные очереди для обработки последовательности входных значений.Каждая задача реализует этап конвейера, а очереди действуют как буферы, которые позволяют этапам конвейера выполняться одновременно, даже если значения обрабатываются по порядку.Вы можете думать о программных конвейерах как об аналогах сборочных линий на фабрике, где каждый элемент сборочной линии строится поэтапно.Частично собранный элемент передается с одного этапа сборки на другой.Выходы сборочной линии располагаются в том же порядке, что и входы.

См. Следующие документы и примеры MSDN:

...