У меня есть приложение, которое использует распараллеливание для обработки данных.
Основная программа находится на C #, а одна из процедур анализа данных - на внешней C ++ dll. Эта библиотека сканирует данные и вызывает обратный вызов каждый раз, когда в данных обнаруживается определенный сигнал. Данные должны быть собраны, отсортированы и затем сохранены в HD.
Вот моя первая простая реализация метода, вызванного обратным вызовом, и метода для сортировки и хранения данных:
// collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// method invoked by the callback
private void Collect(int type, long time)
{
lock(locker) { mySignalList.Add(new MySignal(type, time)); }
}
// store signals to disk
private void Store()
{
// sort the signals
mySignalList.Sort();
// file is a object that manages the writing of data to a FileStream
file.Write(mySignalList.ToArray());
}
Данные состоят из двумерного массива (короткие [] [] данные) размером 10000 x n с переменной n. Я использую распараллеливание следующим образом:
Parallel.For(0, 10000, (int i) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
}
Теперь для каждого из 10000 массивов, по моим оценкам, может быть запущено от 0 до 4 обратных вызовов. Я сталкиваюсь с узким местом и, учитывая, что ресурсы моего ЦП не используются чрезмерно, я предполагаю, что блокировка (вместе с тысячами обратных вызовов) является проблемой (я прав или может быть что-то еще?). Я пробовал коллекцию ConcurrentBag, но производительность все еще хуже (в соответствии с другими пользователями выводы ).
Я думал, что возможное решение для использования кода без блокировки будет иметь несколько коллекций. Тогда была бы необходима стратегия, чтобы каждый поток параллельного процесса работал над одной коллекцией. Коллекции могут быть, например, внутри словаря с идентификатором потока в качестве ключа, но я не знаю каких-либо средств .NET для этого (я должен знать идентификатор потока для инициализации словаря перед запуском распараллеливания). Может ли эта идея быть осуществимой, и, если да, существует ли какой-либо инструмент .NET для этого? Или, в качестве альтернативы, есть другая идея, чтобы ускорить процесс?
[EDIT]
Я последовал предложению Рида Копси и использовал следующее решение (согласно профайлеру VS2010, до того, как бремя блокировки и добавления в список занимало 15% ресурсов, а сейчас только 1%):
// master collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// thread-local storage of data (each thread is working on its List<MySignal>)
ThreadLocal<List<MySignal>> threadLocal;
// analyze data
private void AnalizeData()
{
using(threadLocal = new ThreadLocal<List<MySignal>>(() =>
{ return new List<MySignal>(); }))
{
Parallel.For<int>(0, 10000,
() =>
{ return 0;},
(i, loopState, localState) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
return 0;
},
(localState) =>
{
lock(this)
{
// add thread-local lists to the master collection
mySignalList.AddRange(local.Value);
local.Value.Clear();
}
});
}
}
// method invoked by the callback
private void Collect(int type, long time)
{
local.Value.Add(new MySignal(type, time));
}