Последовательные параллельные вычисления с очень низкой нагрузкой на вызов - PullRequest
2 голосов
/ 17 июня 2019

Предположим, я хочу вычислить методы A и B в следующем примере параллельно:

while (true)
{
   int state = NextState();

   int a = A(state);
   int b = B(state);

   ImportantMethod(a, b);
}

NextState не может быть вычислено заранее в этом примере. Как видите, аргументы методов A и B зависят от состояния. Результирующие значения a и b используются для вызова ImportantMethod, который должен вызываться на каждой итерации.

Для этого требуется, чтобы многопоточный код вызывал A и B параллельно и ожидал их результата в течение одной итерации. Невозможно объединить последовательные итерации для создания большей рабочей нагрузки.

ImportantMethod используется в приложении реального времени и в настоящее время вызывается слишком редко. Методы A и B имеют довольно небольшую нагрузку. Примерно около 10 умножений и 2 тригонометрических функции (sin, cos), просто чтобы представить это в перспективе. A и B являются узким местом, ImportantMethod просто требует, чтобы цикл был чрезвычайно быстрым.

Я рассмотрел использование двух потоков для A и B и пробуждаю их каждую итерацию, используя условную переменную. Но, учитывая небольшую нагрузку, я боюсь, что затраты на пробуждение потоков и ожидание их результатов больше, чем сами вычисления. Я также мог бы попытаться использовать занятое ожидание с логическим флагом без какой-либо синхронизации. Это очень хорошо подействует на процессор, но это будет приемлемо.

Я значительно упростил задачу и не думаю, что смогу создать тестовое приложение, которое дало бы мне точные метрики производительности, которые были бы сопоставимы с тем, что было бы у реального приложения. Однако реализовать это в реальном приложении будет очень сложно, поэтому я хотел бы получить больше знаний, прежде чем пытаться это сделать.

Поэтому я хотел бы спросить, есть ли у кого-то опыт с этими типами проблем. Особенно с очень частыми пробуждениями потоков и синхронизацией.

Может ли ожидание занятости быть более выгодным, чем уведомление с помощью условной переменной? Существуют ли другие способы синхронизации нескольких потоков, которые я еще не рассмотрел, которые могут быть более подходящими?

Ответы [ 2 ]

2 голосов
/ 18 июня 2019

В качестве базовой линии для накладных расходов на синхронизацию вы можете начать первое вычисление A с std::async (так как это всего лишь небольшое изменение кода) и измерить общее влияние на скорость (по сравнению с последовательный код).

До:

int a = A(state);
int b = B(state);
ImportantMethod(a, b);

После того, как:

#include <future>

auto a = std::async(std::launch::async, A, state);
int b = B(state);
ImportantMethod(a.get(), b);

Однако, с операциями, которые вы описываете (10 умножений, плюс две операции sin / cos), я сомневаюсь, что это будет улучшение. Я предполагаю, что вы уже используете некоторую оптимизированную реализацию sin / cos (предварительно вычисленные таблицы также стоит попробовать).

Обратите внимание, что современные процессоры уже выполняют много параллельных вычислений, даже для однопоточного кода. Если A и B - простые функции, компилятор должен иметь большой потенциал для оптимизации (например, векторизация).

Затраты на операции синхронизации (которых нельзя избежать, начав использовать потоки), могут быть значительными. Оба потока должны были синхронизироваться в памяти объекта state. Для передачи результата A (или B) из одного потока в другой, вы можете использовать атомарные операции в этом случае. Если вы напрямую запросите его, вам нужно будет инициализировать его с недопустимым значением, чтобы вы могли определить, когда оно было написано. В противном случае вам необходимо дополнительно установить флаг.

Я сомневаюсь, что накладные расходы на синхронизацию могут быть меньше затрат на непосредственное вычисление значения.

1 голос
/ 21 июня 2019

Хорошо, я сделал некоторые исследования сейчас. Может быть, мои результаты будут кому-то интересны. Имейте в виду, что некоторые части кода ниже относятся к конкретным окнам (если быть точным, сходство потоков и сон).

Похоже, вы можете параллельно выполнять действительно небольшие рабочие нагрузки. Но для этого требуется, в основном, остановить некоторые (или все) ядра, используя занятое ожидание. В противном случае я не смог бы добиться какого-либо ускорения. Также очень важно установить сходство потоков, чтобы потоки выполнялись на определенных ядрах. В моих тестах, если этого не сделать, снова получится медленнее, чем последовательный аналог.

Я придумал следующее:

  1. Установить привязку основного потока к конкретному ядру
  2. Создать N рабочих потоков и установить эти сродства к различным ядрам
  3. Ожидание в рабочих потоках до тех пор, пока не поступит задание на вычисления
  4. Используйте основной поток для назначения заданий рабочим потокам, а также выполняйте своего рода цикл занятости, чтобы проверить, завершил ли выполнение какой-либо рабочий поток

Это приводит к тому, что все участвующие ядра (включая ядро ​​основного потока) будут максимально увеличены до 100% на время вычислений. Рабочие потоки не заканчиваются между последовательными параллельными вызовами, чтобы сохранить накладные расходы на запуск потока.

Следует отметить, что по крайней мере в Windows вам нужно подождать некоторое время, чтобы окна правильно переместили потоки на запрошенные ядра. Я сделал это с помощью сна. На следующем рисунке показано использование моего процессора во время теста (60 с).

CPU usage during 60s in percent (8 logical cores)

Я отметил несколько позиций на графике первого ядра:

  1. Windows переместила все рабочие потоки на свои конкретные ядра. Ядро № 0 больше не занято.
  2. Планировщик задач запущен на Core # 0, и начались параллельные вычисления.
  3. Планировщик задач завершен, и все остальные ядра также возвращаются в нормальное состояние.
  4. Началось последовательное вычисление
  5. Последовательные вычисления сделаны

Я проверил время с двумя разными степенями параллелизма:


32 задачи (по существу, до 7 параллельно, изображение выше захватывается во время этого теста):

Параллельно: 2,7 с

последовательный: 7,1 с


2 задачи (2 параллельно, другие ядра заняты ожиданием (впустую)):

Параллельно: 0,365 с

последовательный: 0,464 с


Как видите, некоторое ускорение даже на одном из двух заданий. Это не половина времени, но, учитывая небольшую нагрузку, я думаю, это неплохо. Я на самом деле очень счастлив, что высокий параллелизм показал себя очень хорошо. Помните, что рабочая нагрузка все еще очень мала, и после выполнения всех задач все синхронизируется перед началом следующей итерации. Компромисс состоит в том, что все участвующие ядра полностью блокируются, пока могут выполняться параллельные вычисления.

Для всех, кто заинтересован, вот мой код тестирования:

#include <iostream>
#include <thread>
#include "windows.h"

//Object that can compute something to simulate workload
class ComputeObject
{
public:
  float A;
  float B;
  float C;

  void Compute()
  {
    //Do some calculations that approximately match the small workload
    C = float(sin(A)) + float(cos(B));
    C = C * A + atan2(A, B);
    C /= A + B;
  }
};

//Stores some information for the worker thread that is responsible for this task
struct Task
{
  ComputeObject* ComputeObject = nullptr; //the current compute object
  bool AssignedFlag = false; //flag that specifies if the compute object has a valid object
  std::thread WorkerThread; //the thread
};

//Pointer to an array of Task
Task* Tasks;

//Number of Cpus (logical cores) and number of worker tasks
int NumCpus;
int NumTask;

//Flag, that is used to stop the workers when computation is done
bool WorkersRunning;

//Main function for each worker
void TaskWorker(const int workerIndex)
{
  //Assign the worker to a specific logical core.
  //Skip the first one, because the scheduler is going to block that one.
  SetThreadAffinityMask(GetCurrentThread(), 1 << (workerIndex + 1));

  //Get pointer to task struct for current worker
  const auto task = Tasks + workerIndex;
  while (WorkersRunning)
  {
    while (!task->AssignedFlag && WorkersRunning); //Wait as long as no valid ComputeObject is set or the workers are stopped.
    if (!WorkersRunning) break; //Get out of the loop when workers are stopped.
    task->ComputeObject->Compute(); //Do computation
    task->AssignedFlag = false; //Invalidate current ComputeObject, so that a new one can be assigned from the scheduler
  }
}

//The scheduler runs on the main thread and constantly checks whether workers are finished with their ComputeObject and assigns new ones
void TaskScheduler(ComputeObject* computeObjects, const int numComputeObjects)
{
  const auto computeObjectsStart = computeObjects;
  const auto computeObjectsEnd = computeObjects + numComputeObjects;
  const auto tasksStart = Tasks;
  const auto tasksEnd = Tasks + NumTask;

  auto currentComputeObject = computeObjectsStart;
  auto currentTask = tasksStart;

  //as long as there are still ComputeObjects to be processed
  while (currentComputeObject != computeObjectsEnd)
  {
    if (!currentTask->AssignedFlag) //if current task has no valid ComputeObject yet
    {
      currentTask->ComputeObject = currentComputeObject++; //assign new computeObject and advance
      currentTask->AssignedFlag = true; //set flag to signal that a ComputeObject has been assigned
    }

    currentTask++; //advance to the next task
    if (currentTask == tasksEnd) currentTask = tasksStart; //go back to the first task if the last task was reached
  }
}

int main()
{
  //get number of logical cores
  NumCpus = int(std::thread::hardware_concurrency());
  NumTask = NumCpus - 1; //first one is this thread and is going to be blocked by the scheduler
  Tasks = new Task[NumTask];


  const auto numParallelWork = 32; //number of computations that can be done in parallel
  const int numInvocations = 1e6; //number of invocations for time measurement

  //create ComputeObjects array and compute start/end pointers
  const auto computeObjects = new ComputeObject[numParallelWork];
  const auto computeObjectsStart = computeObjects;
  const auto computeObjectsEnd = computeObjects + numParallelWork;

  //fill ComputeObjects with random data
  for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
  {
    computeObject->A = float(rand()) / RAND_MAX;
    computeObject->B = float(rand()) / RAND_MAX;
  }

  //set workers running
  WorkersRunning = true;

  //spawn workers
  for (auto i = 0; i < NumTask; i++)
    Tasks[i].WorkerThread = std::thread(TaskWorker, i);

  //put this thread to first logical core
  SetThreadAffinityMask(GetCurrentThread(), 1 << 0);

  //wait 20s to allow windows to actually move the threads to the specified cores
  //monitor task manager to ensure windows actually did that
  Sleep(20000);

  std::chrono::steady_clock::time_point start, end;
  std::chrono::duration<double> elapsed;



  start = std::chrono::steady_clock::now(); //start time measurement

  //invoke task scheduler a few times
  for (auto i = 0; i < numInvocations; i++)
    TaskScheduler(computeObjects, numParallelWork);

  end = std::chrono::steady_clock::now(); //end time measurement
  elapsed = end - start;
  std::cout << "parallel: " << elapsed.count() << "s" << std::endl;


  //stop workers and wait for all threads
  WorkersRunning = false;
  for (auto i = 0; i < NumTask; i++) Tasks[i].WorkerThread.join();


  //wait 10 seconds just for good measures
  Sleep(10000);


  start = std::chrono::steady_clock::now(); //start time measurement

  //invoke sequential loop a few times
  for (auto i = 0; i < numInvocations; i++)
    for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
      computeObject->Compute();

  end = std::chrono::steady_clock::now(); //end time measurement
  elapsed = end - start;
  std::cout << "sequential: " << elapsed.count() << "s" << std::endl;
}
...