Собственная библиотека с многопоточностью C ++ 11 - PullRequest
0 голосов
/ 08 октября 2018

У меня есть код для вычисления модели смеси Гаусса с максимизацией ожидания, чтобы идентифицировать кластеры из заданной выборки входных данных.

Часть кода повторяет вычисление такой модели для числаиспытаний Ntrials (один независимый от другого, но с использованием тех же входных данных), чтобы, наконец, выбрать лучшее решение (одно, максимизирующее общую вероятность из модели).Эта концепция может быть обобщена на многие другие алгоритмы кластеризации (например, k-means).

Я хочу распараллелить часть кода, которая должна повторяться Ntrials раз посредством многопоточности сC ++ 11, так что каждый поток будет выполнять одну пробную версию.

Пример кода, предполагая, что ввод Eigen::ArrayXXd sample из (Ndimensions x Npoints) может иметь тип:

    double bestTotalModelProbability = 0;
    Eigen::ArrayXd clusterIndicesFromSample(Npoints);
    clusterIndicesFromSample.setZero();

    for (int i=0; i < Ntrials; i++)
    {
         totalModelProbability = computeGaussianMixtureModel(sample);


         // Check if this trial is better than the previous one.
         // If so, update the results (cluster index for each point
         // in the sample) and keep them.

         if totalModelProbability > bestTotalModelProbability
         {
             bestTotalModelProbability = totalModelProbability;
             ...
             clusterIndicesFromSample = obtainClusterMembership(sample);
         }
    }

где я передаю эталонное значение sample (Eigen :: Ref), а не сам образец в обе функции: computeGaussianMixtureModel () и receiveClusterMembership () .

Myкод в значительной степени основан на собственном массиве, и задачи, которые я принимаю, могут учитывать измерения порядка 10-100 и 500-1000 различных точек выборки.Я ищу несколько примеров для создания многопоточной версии этого кода с использованием массивов Eigen и std: thread из C ++ 11, но не могу ничего найти вокруг, и я пытаюсь создать хотя бы несколько простых примеров для манипулирования массивами Eigen..

Я даже не уверен, что Eigen можно использовать в std :: thread в C ++ 11.Может ли кто-нибудь помочь мне хотя бы с одним простым примером понять синтакс?Я использую clang ++ в качестве компилятора в Mac OSX на процессоре с 6 ядрами (12 потоков).

1 Ответ

0 голосов
/ 09 октября 2018

Вопрос OP привлек мое внимание, потому что обработка чисел с ускорением, полученным благодаря многопоточности, является одной из главных задач в моем личном списке.

Я должен признать, что мой опыт работы с библиотекой Eigen оченьограничено.(Однажды я использовал декомпозицию 3 × 3 матриц вращения на углы Эйлера, которая очень умно решается в общем случае в библиотеке Eigen.)

Следовательно, я определил еще одну примерную задачу, состоящую из глупого подсчета значенийв примере набора данных.

Это выполняется несколько раз (с использованием одной и той же функции оценки):

  1. однопоточное (для получения значения для сравнения)
  2. запуск каждой подзадачи в дополнительном потоке (в довольно глупом подходе)
  3. запуск потоков с чередующимся доступом к образцам данных
  4. запуск потоков с разделенным доступом к образцам данных.

test-multi-threading.cc:

#include <cstdint>
#include <cstdlib>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <limits>
#include <thread>
#include <vector>

// a sample function to process a certain amount of data
template <typename T>
size_t countFrequency(
  size_t n, const T data[], const T &begin, const T &end)
{
  size_t result = 0;
  for (size_t i = 0; i < n; ++i) result += data[i] >= begin && data[i] < end;
  return result;
}

typedef std::uint16_t Value;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::microseconds MuSecs;
typedef decltype(std::chrono::duration_cast<MuSecs>(Clock::now() - Clock::now())) Time;

Time duration(const Clock::time_point &t0)
{
  return std::chrono::duration_cast<MuSecs>(Clock::now() - t0);
}

std::vector<Time> makeTest()
{
  const Value SizeGroup = 4, NGroups = 10000, N = SizeGroup * NGroups;
  const size_t NThreads = std::thread::hardware_concurrency();
  // make a test sample
  std::vector<Value> sample(N);
  for (Value &value : sample) value = (Value)rand();
  // prepare result vectors
  std::vector<size_t> results4[4] = {
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0)
  };
  // make test
  std::vector<Time> times{
    [&]() { // single threading
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[0];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment single-threaded
      for (size_t i = 0; i < NGroups; ++i) {
        results[i] = countFrequency(data.size(), data.data(),
          (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
      }
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - stupid aproach
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[1];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value i = 0; i < NGroups;) {
        size_t nT = 0;
        for (; nT < NThreads && i < NGroups; ++nT, ++i) {
          threads[nT] = std::move(std::thread(
            [i, &results, &data, SizeGroup]() {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }));
        }
        for (size_t iT = 0; iT < nT; ++iT) threads[iT].join();
      }
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - interleaved
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[2];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value iT = 0; iT < NThreads; ++iT) {
        threads[iT] = std::move(std::thread(
          [iT, &results, &data, NGroups, SizeGroup, NThreads]() {
            for (Value i = iT; i < NGroups; i += NThreads) {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }
          }));
      }
      for (std::thread &threadI : threads) threadI.join();
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - grouped
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[3];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value iT = 0; iT < NThreads; ++iT) {
        threads[iT] = std::move(std::thread(
          [iT, &results, &data, NGroups, SizeGroup, NThreads]() {
            for (Value i = iT * NGroups / NThreads,
              iN = (iT + 1) * NGroups / NThreads; i < iN; ++i) {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }
          }));
      }
      for (std::thread &threadI : threads) threadI.join();
      // done
      return duration(t0);
    }()
  };
  // check results (must be equal for any kind of computation)
  const unsigned nResults = sizeof results4 / sizeof *results4;
  for (unsigned i = 1; i < nResults; ++i) {
    size_t nErrors = 0;
    for (Value j = 0; j < NGroups; ++j) {
      if (results4[0][j] != results4[i][j]) {
        ++nErrors;
#ifdef _DEBUG
        std::cerr
          << "results4[0][" << j << "]: " << results4[0][j]
          << " != results4[" << i << "][" << j << "]: " << results4[i][j]
          << "!\n";
#endif // _DEBUG
      }
    }
    if (nErrors) std::cerr << nErrors << " errors in results4[" << i << "]!\n";
  }
  // done
  return times;
}

int main()
{
  std::cout << "std::thread::hardware_concurrency(): "
    << std::thread::hardware_concurrency() << '\n';
  // heat up
  std::cout << "Heat up...\n";
  for (unsigned i = 0; i < 3; ++i) makeTest();
  // repeat NTrials times
  const unsigned NTrials = 10;
  std::cout << "Measuring " << NTrials << " runs...\n"
    << "   "
    << " | " << std::setw(10) << "Single"
    << " | " << std::setw(10) << "Multi 1"
    << " | " << std::setw(10) << "Multi 2"
    << " | " << std::setw(10) << "Multi 3"
    << '\n';
  std::vector<double> sumTimes;
  for (unsigned i = 0; i < NTrials; ++i) {
    std::vector<Time> times = makeTest();
    std::cout << std::setw(2) << (i + 1) << ".";
    for (const Time &time : times) {
      std::cout << " | " << std::setw(10) << time.count();
    }
    std::cout << '\n';
    sumTimes.resize(times.size(), 0.0);
    for (size_t j = 0; j < times.size(); ++j) sumTimes[j] += times[j].count();
  }
  std::cout << "Average Values:\n   ";
  for (const double &sumTime : sumTimes) {
    std::cout << " | "
      << std::setw(10) << std::fixed << std::setprecision(1)
      << sumTime / NTrials;
  }
  std::cout << '\n';
  std::cout << "Ratio:\n   ";
  for (const double &sumTime : sumTimes) {
    std::cout << " | "
      << std::setw(10) << std::fixed << std::setprecision(3)
      << sumTime / sumTimes.front();
  }
  std::cout << '\n';
  // done
  return 0;
}

Скомпилировано и протестировано на cygwin64 на Windows 10:

$ g++ --version
g++ (GCC) 7.3.0

$ g++ -std=c++11 -O2 -o test-multi-threading test-multi-threading.cc

$ ./test-multi-threading
std::thread::hardware_concurrency(): 8
Heat up...
Measuring 10 runs...
    |     Single |    Multi 1 |    Multi 2 |    Multi 3
 1. |     384008 |    1052937 |     130662 |     138411
 2. |     386500 |    1103281 |     133030 |     132576
 3. |     382968 |    1078988 |     137123 |     137780
 4. |     395158 |    1120752 |     138731 |     138650
 5. |     385870 |    1105885 |     144825 |     129405
 6. |     366724 |    1071788 |     137684 |     130289
 7. |     352204 |    1104191 |     133675 |     130505
 8. |     331679 |    1072299 |     135476 |     138257
 9. |     373416 |    1053881 |     138467 |     137613
10. |     370872 |    1096424 |     136810 |     147960
Average Values:
    |   372939.9 |  1086042.6 |   136648.3 |   136144.6
Ratio:
    |      1.000 |      2.912 |      0.366 |      0.365

Я сделал то же самое на coliru.com,(Мне пришлось уменьшить циклы разогрева и размер образца, поскольку я превысил ограничение по времени с исходными значениями.):

g++ (GCC) 8.1.0
Copyright (C) 2018 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

std::thread::hardware_concurrency(): 4
Heat up...
Measuring 10 runs...
    |     Single |    Multi 1 |    Multi 2 |    Multi 3
 1. |     224684 |     297729 |      48334 |      39016
 2. |     146232 |     337222 |      66308 |      59994
 3. |     195750 |     344056 |      61383 |      63172
 4. |     198629 |     317719 |      62695 |      50413
 5. |     149125 |     356471 |      61447 |      57487
 6. |     155355 |     322185 |      50254 |      35214
 7. |     140269 |     316224 |      61482 |      53889
 8. |     154454 |     334814 |      58382 |      53796
 9. |     177426 |     340723 |      62195 |      54352
10. |     151951 |     331772 |      61802 |      46727
Average Values:
    |   169387.5 |   329891.5 |    59428.2 |    51406.0
Ratio:
    |      1.000 |      1.948 |      0.351 |      0.303

Live Demo on coliru

Интересно немного, что отношения на coliru (только с 4 потоками) даже лучше, чем на моем компьютере с (с 8 потоками).На самом деле, я не знаю, как это объяснить.Тем не менее, существует множество других различий в двух настройках, которые могут или не могут быть ответственными.По крайней мере, оба измерения показывают грубое ускорение 3 для 3 rd и 4 th , где 2 nd потребляет уникально каждое потенциальное ускорение (вероятно, запустив и присоединившись ко всем этим потокам).

Глядя на пример кода, вы поймете, что нет мьютекса или какой-либо другой явной блокировки.Это намеренно.Как я узнал (много, много лет назад), каждая попытка распараллеливания может вызвать определенную дополнительную нагрузку на связь (для параллельных задач, которые должны обмениваться данными).Если коммуникационные издержки становятся слишком большими, они просто потребляют преимущество в скорости параллелизма.Таким образом, наилучшее ускорение может быть достигнуто за счет:

  • наименьших накладных расходов на связь, т. Е. Параллельные задачи работают с независимыми данными
  • наименьших усилий для пост-объединения одновременно вычисляемых результатов.

В моем примере кода я

  1. подготовил все данные и хранилище перед запуском потоков,
  2. общие данные, которые считываются, никогда не изменяются во время работы потоков,
  3. данные, которые записываются так, как если бы они были локальными по отношению к потокам (никакие два потока не пишут по одному и тому же адресу данных)
  4. оценивают вычисленные результаты после объединения всех потоков.

Относительно 3. Я немного подумал, является ли это законным или нет, то есть предоставляется ли оно для данных, записанных в потоках, чтобы они правильно отображались в главном потоке после присоединения.(Тот факт, что что-то кажется работает нормально, иллюзорен в целом, но особенно иллюзорен в отношении многопоточности.)

cppreference.com дает следующие объяснения

  • для std::thread::thread()

    Завершение вызова конструктора синхронизируется с (как определено в std::memory_order) начало вызова копии f в новом потоке выполнения.

  • для std::thread::join()

    Завершение потока, обозначенного *this , синхронизируется с соответствующим успешным возвратом из join () .

В переполнении стека я обнаружил следующие связанные вопросы и ответы:

, который убедил меня, что все в порядке.

Однаконедостатком является то, что

  • создание и объединение потоков требует дополнительных усилий (и это не так уж и дешево).

Альтернативным подходом может быть использование потокабассейн, чтобы преодолеть это.Я немного погуглил и нашел, например, ThreadPool Якоба Прогша на github .Однако, я думаю, что с пулом потоков проблема с блокировкой возвращается «в игре».

Будет ли это работать и для функций Eigen, зависит от того, как соотв.Собственные функции реализованы.Если в них есть доступ к глобальным переменным (которые становятся общими, когда одна и та же функция вызывается одновременно), это вызовет скачок данных.

Немного погуглив, я нашел следующий документ.

Eigen и многопоточность - Использование Eigen в многопоточном приложении :

В случае, если ваше собственное приложение является многопоточным, и несколько потоков выполняют вызовы Eigen, затем вы должны инициализировать Eigen , вызывая следующую подпрограмму перед созданием потоков:

#include <Eigen/Core>
int main(int argc, char** argv)
{
  Eigen::initParallel();
  ...
}

Примечание

С Eigen 3.3 и полностью совместимый с C ++ 11 компилятор (т. Е. потокобезопасная инициализация статической локальной переменной ), тогда вызывать initParallel() необязательно.

Предупреждение

обратите внимание, что все функции, генерирующие случайные матрицы, не реентерабельны и не поточнобезопасны.К ним относятся DenseBase :: Random () и DenseBase :: setRandom () , несмотря на вызов Eigen :: initParallel ().Это потому, что эти функции основаны на std :: rand, который не является входящим.Для поточно-ориентированного генератора случайных чисел мы рекомендуем использовать случайную функцию boost :: random или c ++ 11.

...