Как ускорить использование подпрограмм в многопоточном сценарии - PullRequest
2 голосов
/ 21 февраля 2011

У меня есть приложение, которое использует распараллеливание для обработки данных.

Основная программа находится на C #, а одна из процедур анализа данных - на внешней C ++ dll. Эта библиотека сканирует данные и вызывает обратный вызов каждый раз, когда в данных обнаруживается определенный сигнал. Данные должны быть собраны, отсортированы и затем сохранены в HD.

Вот моя первая простая реализация метода, вызванного обратным вызовом, и метода для сортировки и хранения данных:

// collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();

// method invoked by the callback
private void Collect(int type, long time)
{
    lock(locker) { mySignalList.Add(new MySignal(type, time)); }
}

// store signals to disk
private void Store()
{
    // sort the signals
    mySignalList.Sort();
    // file is a object that manages the writing of data to a FileStream
    file.Write(mySignalList.ToArray());
}

Данные состоят из двумерного массива (короткие [] [] данные) размером 10000 x n с переменной n. Я использую распараллеливание следующим образом:

Parallel.For(0, 10000, (int i) =>
{
    // wrapper for the external c++ dll
    ProcessData(data[i]);
}

Теперь для каждого из 10000 массивов, по моим оценкам, может быть запущено от 0 до 4 обратных вызовов. Я сталкиваюсь с узким местом и, учитывая, что ресурсы моего ЦП не используются чрезмерно, я предполагаю, что блокировка (вместе с тысячами обратных вызовов) является проблемой (я прав или может быть что-то еще?). Я пробовал коллекцию ConcurrentBag, но производительность все еще хуже (в соответствии с другими пользователями выводы ).

Я думал, что возможное решение для использования кода без блокировки будет иметь несколько коллекций. Тогда была бы необходима стратегия, чтобы каждый поток параллельного процесса работал над одной коллекцией. Коллекции могут быть, например, внутри словаря с идентификатором потока в качестве ключа, но я не знаю каких-либо средств .NET для этого (я должен знать идентификатор потока для инициализации словаря перед запуском распараллеливания). Может ли эта идея быть осуществимой, и, если да, существует ли какой-либо инструмент .NET для этого? Или, в качестве альтернативы, есть другая идея, чтобы ускорить процесс?

[EDIT] Я последовал предложению Рида Копси и использовал следующее решение (согласно профайлеру VS2010, до того, как бремя блокировки и добавления в список занимало 15% ресурсов, а сейчас только 1%):

// master collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// thread-local storage of data (each thread is working on its List<MySignal>)
ThreadLocal<List<MySignal>> threadLocal;

// analyze data
private void AnalizeData()
{
    using(threadLocal = new ThreadLocal<List<MySignal>>(() => 
        { return new List<MySignal>(); }))
    {
        Parallel.For<int>(0, 10000,
        () =>
        { return 0;},
        (i, loopState, localState) =>
        {
            // wrapper for the external c++ dll
            ProcessData(data[i]);
            return 0;
        },
        (localState) =>
        {
            lock(this)
            {
                // add thread-local lists to the master collection
                mySignalList.AddRange(local.Value);
                local.Value.Clear();
            }
        });
    }
}

// method invoked by the callback
private void Collect(int type, long time)
{
    local.Value.Add(new MySignal(type, time));
}

Ответы [ 4 ]

1 голос
/ 21 февраля 2011

Основные коллекции в C # не являются поточно-ориентированными.

Проблема, с которой вы столкнулись, связана с тем, что вы блокируете всю коллекцию только для вызова метода add().

Вы можете создать потокобезопасную коллекцию, которая блокирует только отдельные элементы внутри коллекции, а не всю коллекцию.

Давайте рассмотрим, например, связанный список .

Реализуйте метод add(item (or list)), который выполняет следующее:

  1. Коллекция замков.
  2. A = получить последний предмет.
  3. установить ссылку на последний элемент на новый элемент (или последний элемент в новом списке).
  4. заблокировать последний предмет (A).
  5. разблокировать коллекцию.
  6. добавить новые элементы / список в конец A.
  7. разблокировать заблокированный элемент.

Это заблокирует всю коллекцию для 3 простых задач при добавлении.

Затем, перебирая список, просто сделайте trylock() для каждого объекта. если он заблокирован, подождите, пока блокировка не освободится (таким образом, вы уверены, что add() закончен).
В C # вы можете сделать пустой блок lock() для объекта как trylock(). Так что теперь вы можете безопасно добавлять и перебирать список одновременно.

Аналогичные решения могут быть реализованы для других команд, если это необходимо.

1 голос
/ 21 февраля 2011

Вы не говорите, сколько «узких мест» вы встречаете.Но давайте посмотрим на блокировки.

На моей машине (четырехъядерный процессор, 2,4 ГГц) блокировка будет стоить около 70 наносекунд, если она не будет утверждена.Я не знаю, сколько времени требуется, чтобы добавить элемент в список, но я не могу себе представить, что это займет более нескольких микросекунд.Но давайте потребуем 100 микросекунд (я был бы очень удивлен, обнаружив, что это даже 10 микросекунд), чтобы добавить элемент в список, принимая во внимание конфликт блокировки.Таким образом, если вы добавляете 40 000 элементов в список, это 4 000 000 микросекунд или 4 секунды.И я ожидал бы, что одно ядро ​​будет привязано, если бы это было так.

Я не использовал ConcurrentBag, но я обнаружил, что производительность BlockingCollection очень хорошая.

Я подозреваю, однако, что ваше узкое место где-то еще.Вы сделали профилирование?

1 голос
/ 21 февраля 2011

считал, что возможное решение для использования кода без блокировки будет иметь несколько коллекций. Тогда была бы необходима стратегия, чтобы каждый поток параллельного процесса работал над одной коллекцией. Коллекции могут быть, например, внутри словаря с идентификатором потока в качестве ключа, но я не знаю каких-либо средств .NET для этого (я должен знать идентификатор потока для инициализации словаря перед запуском распараллеливания). Может ли эта идея быть осуществимой, и, если да, существует ли какой-либо инструмент .NET для этого? Или, в качестве альтернативы, есть другая идея, чтобы ускорить процесс?

Возможно, вы захотите использовать ThreadLocal<T> для хранения ваших коллекций. Это автоматически выделяет отдельную коллекцию для каждого потока.

При этом существуют перегрузки Parallel.For, которые работают с локальным состоянием и в конце имеют проход сбора. Это, возможно, позволит вам создать оболочку ProcessData, в которой каждое тело цикла работает со своей коллекцией, а затем рекомбинировать в конце. Это потенциально устранит необходимость в блокировке (поскольку каждый поток работает над своим набором данных) до фазы рекомбинации, которая происходит один раз для каждого потока (вместо одного раза для задачи, то есть: 10000 раз). Это может уменьшить количество блокировок, которые вы принимаете, с ~ 25000 (0-4 * 10000) до нескольких (зависит от системы и алгоритма, но в четырехъядерной системе, вероятно, около 10 в моем опыте).

Подробнее см. В моем блоге о агрегировании данных с Parallel.For / ForEach . Он демонстрирует перегрузки и объясняет, как они работают более подробно.

0 голосов
/ 21 февраля 2011

Любое встроенное решение для коллекции будет включать некоторую блокировку. Могут быть способы избежать этого, возможно, путем разделения реальных конструкций данных, которые читаются / записываются, но вам придется блокировать КУДА-ТО.

Также следует понимать, что Parallel.For () будет использовать пул потоков. Несмотря на простоту реализации, вы теряете детальный контроль над созданием / уничтожением потоков, а при запуске большой параллельной задачи у пула потоков возникают серьезные накладные расходы.

С концептуальной точки зрения я бы попробовал две вещи в тандеме, чтобы ускорить этот алгоритм:

  • Создавайте темы самостоятельно, используя класс Thread. Это освобождает вас от замедления планирования пула потоков; поток начинает обрабатывать (или ожидать времени ЦП), когда вы указываете его запустить, вместо того, чтобы пул потоков подавал запросы на потоки во внутреннюю работу в своем собственном темпе. Вы должны знать о количестве потоков, которые вы используете одновременно; Эмпирическое правило заключается в том, что преимущества многопоточности преодолеваются накладными расходами, когда количество активных потоков более чем в два раза превышает число «исполнительных блоков», доступных для выполнения потоков. Однако вы должны быть в состоянии спроектировать систему, которая учитывает это относительно просто.
  • Разделите коллекцию результатов, создав словарь коллекций результатов. Каждая коллекция результатов привязана к некоторому токену, который переносится потоком, выполняющим обработку, и передается обратному вызову. Словарь может иметь несколько элементов READ одновременно без блокировки, и поскольку каждый поток ЗАПИСЫВАЕТ на свою коллекцию в Словаре, не должно быть необходимости блокировать эти списки (и даже если вы заблокировали их, вы не будете блокировка других тем). В результате единственная коллекция, которая должна быть заблокирована таким образом, чтобы блокировать потоки, - это основной словарь, когда в него добавляется новая коллекция для нового потока. Это не должно случаться часто, если вы умны утилизировать токены.
...