Вопрос OP привлек мое внимание, потому что обработка чисел с ускорением, полученным благодаря многопоточности, является одной из главных задач в моем личном списке.
Я должен признать, что мой опыт работы с библиотекой Eigen оченьограничено.(Однажды я использовал декомпозицию 3 × 3 матриц вращения на углы Эйлера, которая очень умно решается в общем случае в библиотеке Eigen.)
Следовательно, я определил еще одну примерную задачу, состоящую из глупого подсчета значенийв примере набора данных.
Это выполняется несколько раз (с использованием одной и той же функции оценки):
- однопоточное (для получения значения для сравнения)
- запуск каждой подзадачи в дополнительном потоке (в довольно глупом подходе)
- запуск потоков с чередующимся доступом к образцам данных
- запуск потоков с разделенным доступом к образцам данных.
test-multi-threading.cc
:
#include <cstdint>
#include <cstdlib>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <limits>
#include <thread>
#include <vector>
// a sample function to process a certain amount of data
template <typename T>
size_t countFrequency(
size_t n, const T data[], const T &begin, const T &end)
{
size_t result = 0;
for (size_t i = 0; i < n; ++i) result += data[i] >= begin && data[i] < end;
return result;
}
typedef std::uint16_t Value;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::microseconds MuSecs;
typedef decltype(std::chrono::duration_cast<MuSecs>(Clock::now() - Clock::now())) Time;
Time duration(const Clock::time_point &t0)
{
return std::chrono::duration_cast<MuSecs>(Clock::now() - t0);
}
std::vector<Time> makeTest()
{
const Value SizeGroup = 4, NGroups = 10000, N = SizeGroup * NGroups;
const size_t NThreads = std::thread::hardware_concurrency();
// make a test sample
std::vector<Value> sample(N);
for (Value &value : sample) value = (Value)rand();
// prepare result vectors
std::vector<size_t> results4[4] = {
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0)
};
// make test
std::vector<Time> times{
[&]() { // single threading
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[0];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment single-threaded
for (size_t i = 0; i < NGroups; ++i) {
results[i] = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
}
// done
return duration(t0);
}(),
[&]() { // multi-threading - stupid aproach
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[1];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value i = 0; i < NGroups;) {
size_t nT = 0;
for (; nT < NThreads && i < NGroups; ++nT, ++i) {
threads[nT] = std::move(std::thread(
[i, &results, &data, SizeGroup]() {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
}));
}
for (size_t iT = 0; iT < nT; ++iT) threads[iT].join();
}
// done
return duration(t0);
}(),
[&]() { // multi-threading - interleaved
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[2];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value iT = 0; iT < NThreads; ++iT) {
threads[iT] = std::move(std::thread(
[iT, &results, &data, NGroups, SizeGroup, NThreads]() {
for (Value i = iT; i < NGroups; i += NThreads) {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
}
}));
}
for (std::thread &threadI : threads) threadI.join();
// done
return duration(t0);
}(),
[&]() { // multi-threading - grouped
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[3];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value iT = 0; iT < NThreads; ++iT) {
threads[iT] = std::move(std::thread(
[iT, &results, &data, NGroups, SizeGroup, NThreads]() {
for (Value i = iT * NGroups / NThreads,
iN = (iT + 1) * NGroups / NThreads; i < iN; ++i) {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
}
}));
}
for (std::thread &threadI : threads) threadI.join();
// done
return duration(t0);
}()
};
// check results (must be equal for any kind of computation)
const unsigned nResults = sizeof results4 / sizeof *results4;
for (unsigned i = 1; i < nResults; ++i) {
size_t nErrors = 0;
for (Value j = 0; j < NGroups; ++j) {
if (results4[0][j] != results4[i][j]) {
++nErrors;
#ifdef _DEBUG
std::cerr
<< "results4[0][" << j << "]: " << results4[0][j]
<< " != results4[" << i << "][" << j << "]: " << results4[i][j]
<< "!\n";
#endif // _DEBUG
}
}
if (nErrors) std::cerr << nErrors << " errors in results4[" << i << "]!\n";
}
// done
return times;
}
int main()
{
std::cout << "std::thread::hardware_concurrency(): "
<< std::thread::hardware_concurrency() << '\n';
// heat up
std::cout << "Heat up...\n";
for (unsigned i = 0; i < 3; ++i) makeTest();
// repeat NTrials times
const unsigned NTrials = 10;
std::cout << "Measuring " << NTrials << " runs...\n"
<< " "
<< " | " << std::setw(10) << "Single"
<< " | " << std::setw(10) << "Multi 1"
<< " | " << std::setw(10) << "Multi 2"
<< " | " << std::setw(10) << "Multi 3"
<< '\n';
std::vector<double> sumTimes;
for (unsigned i = 0; i < NTrials; ++i) {
std::vector<Time> times = makeTest();
std::cout << std::setw(2) << (i + 1) << ".";
for (const Time &time : times) {
std::cout << " | " << std::setw(10) << time.count();
}
std::cout << '\n';
sumTimes.resize(times.size(), 0.0);
for (size_t j = 0; j < times.size(); ++j) sumTimes[j] += times[j].count();
}
std::cout << "Average Values:\n ";
for (const double &sumTime : sumTimes) {
std::cout << " | "
<< std::setw(10) << std::fixed << std::setprecision(1)
<< sumTime / NTrials;
}
std::cout << '\n';
std::cout << "Ratio:\n ";
for (const double &sumTime : sumTimes) {
std::cout << " | "
<< std::setw(10) << std::fixed << std::setprecision(3)
<< sumTime / sumTimes.front();
}
std::cout << '\n';
// done
return 0;
}
Скомпилировано и протестировано на cygwin64 на Windows 10:
$ g++ --version
g++ (GCC) 7.3.0
$ g++ -std=c++11 -O2 -o test-multi-threading test-multi-threading.cc
$ ./test-multi-threading
std::thread::hardware_concurrency(): 8
Heat up...
Measuring 10 runs...
| Single | Multi 1 | Multi 2 | Multi 3
1. | 384008 | 1052937 | 130662 | 138411
2. | 386500 | 1103281 | 133030 | 132576
3. | 382968 | 1078988 | 137123 | 137780
4. | 395158 | 1120752 | 138731 | 138650
5. | 385870 | 1105885 | 144825 | 129405
6. | 366724 | 1071788 | 137684 | 130289
7. | 352204 | 1104191 | 133675 | 130505
8. | 331679 | 1072299 | 135476 | 138257
9. | 373416 | 1053881 | 138467 | 137613
10. | 370872 | 1096424 | 136810 | 147960
Average Values:
| 372939.9 | 1086042.6 | 136648.3 | 136144.6
Ratio:
| 1.000 | 2.912 | 0.366 | 0.365
Я сделал то же самое на coliru.com,(Мне пришлось уменьшить циклы разогрева и размер образца, поскольку я превысил ограничение по времени с исходными значениями.):
g++ (GCC) 8.1.0
Copyright (C) 2018 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
std::thread::hardware_concurrency(): 4
Heat up...
Measuring 10 runs...
| Single | Multi 1 | Multi 2 | Multi 3
1. | 224684 | 297729 | 48334 | 39016
2. | 146232 | 337222 | 66308 | 59994
3. | 195750 | 344056 | 61383 | 63172
4. | 198629 | 317719 | 62695 | 50413
5. | 149125 | 356471 | 61447 | 57487
6. | 155355 | 322185 | 50254 | 35214
7. | 140269 | 316224 | 61482 | 53889
8. | 154454 | 334814 | 58382 | 53796
9. | 177426 | 340723 | 62195 | 54352
10. | 151951 | 331772 | 61802 | 46727
Average Values:
| 169387.5 | 329891.5 | 59428.2 | 51406.0
Ratio:
| 1.000 | 1.948 | 0.351 | 0.303
Live Demo on coliru
Интересно немного, что отношения на coliru (только с 4 потоками) даже лучше, чем на моем компьютере с (с 8 потоками).На самом деле, я не знаю, как это объяснить.Тем не менее, существует множество других различий в двух настройках, которые могут или не могут быть ответственными.По крайней мере, оба измерения показывают грубое ускорение 3 для 3 rd и 4 th , где 2 nd потребляет уникально каждое потенциальное ускорение (вероятно, запустив и присоединившись ко всем этим потокам).
Глядя на пример кода, вы поймете, что нет мьютекса или какой-либо другой явной блокировки.Это намеренно.Как я узнал (много, много лет назад), каждая попытка распараллеливания может вызвать определенную дополнительную нагрузку на связь (для параллельных задач, которые должны обмениваться данными).Если коммуникационные издержки становятся слишком большими, они просто потребляют преимущество в скорости параллелизма.Таким образом, наилучшее ускорение может быть достигнуто за счет:
- наименьших накладных расходов на связь, т. Е. Параллельные задачи работают с независимыми данными
- наименьших усилий для пост-объединения одновременно вычисляемых результатов.
В моем примере кода я
- подготовил все данные и хранилище перед запуском потоков,
- общие данные, которые считываются, никогда не изменяются во время работы потоков,
- данные, которые записываются так, как если бы они были локальными по отношению к потокам (никакие два потока не пишут по одному и тому же адресу данных)
- оценивают вычисленные результаты после объединения всех потоков.
Относительно 3. Я немного подумал, является ли это законным или нет, то есть предоставляется ли оно для данных, записанных в потоках, чтобы они правильно отображались в главном потоке после присоединения.(Тот факт, что что-то кажется работает нормально, иллюзорен в целом, но особенно иллюзорен в отношении многопоточности.)
cppreference.com дает следующие объяснения
В переполнении стека я обнаружил следующие связанные вопросы и ответы:
, который убедил меня, что все в порядке.
Однаконедостатком является то, что
- создание и объединение потоков требует дополнительных усилий (и это не так уж и дешево).
Альтернативным подходом может быть использование потокабассейн, чтобы преодолеть это.Я немного погуглил и нашел, например, ThreadPool Якоба Прогша на github .Однако, я думаю, что с пулом потоков проблема с блокировкой возвращается «в игре».
Будет ли это работать и для функций Eigen, зависит от того, как соотв.Собственные функции реализованы.Если в них есть доступ к глобальным переменным (которые становятся общими, когда одна и та же функция вызывается одновременно), это вызовет скачок данных.
Немного погуглив, я нашел следующий документ.
Eigen и многопоточность - Использование Eigen в многопоточном приложении :
В случае, если ваше собственное приложение является многопоточным, и несколько потоков выполняют вызовы Eigen, затем вы должны инициализировать Eigen , вызывая следующую подпрограмму перед созданием потоков:
#include <Eigen/Core>
int main(int argc, char** argv)
{
Eigen::initParallel();
...
}
Примечание
С Eigen 3.3 и полностью совместимый с C ++ 11 компилятор (т. Е. потокобезопасная инициализация статической локальной переменной ), тогда вызывать initParallel()
необязательно.
Предупреждение
обратите внимание, что все функции, генерирующие случайные матрицы, не реентерабельны и не поточнобезопасны.К ним относятся DenseBase :: Random () и DenseBase :: setRandom () , несмотря на вызов Eigen :: initParallel ().Это потому, что эти функции основаны на std :: rand, который не является входящим.Для поточно-ориентированного генератора случайных чисел мы рекомендуем использовать случайную функцию boost :: random или c ++ 11.