Хорошо, я сделал некоторые исследования сейчас. Может быть, мои результаты будут кому-то интересны.
Имейте в виду, что некоторые части кода ниже относятся к конкретным окнам (если быть точным, сходство потоков и сон).
Похоже, вы можете параллельно выполнять действительно небольшие рабочие нагрузки. Но для этого требуется, в основном, остановить некоторые (или все) ядра, используя занятое ожидание. В противном случае я не смог бы добиться какого-либо ускорения. Также очень важно установить сходство потоков, чтобы потоки выполнялись на определенных ядрах. В моих тестах, если этого не сделать, снова получится медленнее, чем последовательный аналог.
Я придумал следующее:
- Установить привязку основного потока к конкретному ядру
- Создать N рабочих потоков и установить эти сродства к различным ядрам
- Ожидание в рабочих потоках до тех пор, пока не поступит задание на вычисления
- Используйте основной поток для назначения заданий рабочим потокам, а также выполняйте своего рода цикл занятости, чтобы проверить, завершил ли выполнение какой-либо рабочий поток
Это приводит к тому, что все участвующие ядра (включая ядро основного потока) будут максимально увеличены до 100% на время вычислений. Рабочие потоки не заканчиваются между последовательными параллельными вызовами, чтобы сохранить накладные расходы на запуск потока.
Следует отметить, что по крайней мере в Windows вам нужно подождать некоторое время, чтобы окна правильно переместили потоки на запрошенные ядра. Я сделал это с помощью сна. На следующем рисунке показано использование моего процессора во время теста (60 с).
Я отметил несколько позиций на графике первого ядра:
- Windows переместила все рабочие потоки на свои конкретные ядра. Ядро № 0 больше не занято.
- Планировщик задач запущен на Core # 0, и начались параллельные вычисления.
- Планировщик задач завершен, и все остальные ядра также возвращаются в нормальное состояние.
- Началось последовательное вычисление
- Последовательные вычисления сделаны
Я проверил время с двумя разными степенями параллелизма:
32 задачи (по существу, до 7 параллельно, изображение выше захватывается во время этого теста):
Параллельно: 2,7 с
последовательный: 7,1 с
2 задачи (2 параллельно, другие ядра заняты ожиданием (впустую)):
Параллельно: 0,365 с
последовательный: 0,464 с
Как видите, некоторое ускорение даже на одном из двух заданий. Это не половина времени, но, учитывая небольшую нагрузку, я думаю, это неплохо. Я на самом деле очень счастлив, что высокий параллелизм показал себя очень хорошо. Помните, что рабочая нагрузка все еще очень мала, и после выполнения всех задач все синхронизируется перед началом следующей итерации.
Компромисс состоит в том, что все участвующие ядра полностью блокируются, пока могут выполняться параллельные вычисления.
Для всех, кто заинтересован, вот мой код тестирования:
#include <iostream>
#include <thread>
#include "windows.h"
//Object that can compute something to simulate workload
class ComputeObject
{
public:
float A;
float B;
float C;
void Compute()
{
//Do some calculations that approximately match the small workload
C = float(sin(A)) + float(cos(B));
C = C * A + atan2(A, B);
C /= A + B;
}
};
//Stores some information for the worker thread that is responsible for this task
struct Task
{
ComputeObject* ComputeObject = nullptr; //the current compute object
bool AssignedFlag = false; //flag that specifies if the compute object has a valid object
std::thread WorkerThread; //the thread
};
//Pointer to an array of Task
Task* Tasks;
//Number of Cpus (logical cores) and number of worker tasks
int NumCpus;
int NumTask;
//Flag, that is used to stop the workers when computation is done
bool WorkersRunning;
//Main function for each worker
void TaskWorker(const int workerIndex)
{
//Assign the worker to a specific logical core.
//Skip the first one, because the scheduler is going to block that one.
SetThreadAffinityMask(GetCurrentThread(), 1 << (workerIndex + 1));
//Get pointer to task struct for current worker
const auto task = Tasks + workerIndex;
while (WorkersRunning)
{
while (!task->AssignedFlag && WorkersRunning); //Wait as long as no valid ComputeObject is set or the workers are stopped.
if (!WorkersRunning) break; //Get out of the loop when workers are stopped.
task->ComputeObject->Compute(); //Do computation
task->AssignedFlag = false; //Invalidate current ComputeObject, so that a new one can be assigned from the scheduler
}
}
//The scheduler runs on the main thread and constantly checks whether workers are finished with their ComputeObject and assigns new ones
void TaskScheduler(ComputeObject* computeObjects, const int numComputeObjects)
{
const auto computeObjectsStart = computeObjects;
const auto computeObjectsEnd = computeObjects + numComputeObjects;
const auto tasksStart = Tasks;
const auto tasksEnd = Tasks + NumTask;
auto currentComputeObject = computeObjectsStart;
auto currentTask = tasksStart;
//as long as there are still ComputeObjects to be processed
while (currentComputeObject != computeObjectsEnd)
{
if (!currentTask->AssignedFlag) //if current task has no valid ComputeObject yet
{
currentTask->ComputeObject = currentComputeObject++; //assign new computeObject and advance
currentTask->AssignedFlag = true; //set flag to signal that a ComputeObject has been assigned
}
currentTask++; //advance to the next task
if (currentTask == tasksEnd) currentTask = tasksStart; //go back to the first task if the last task was reached
}
}
int main()
{
//get number of logical cores
NumCpus = int(std::thread::hardware_concurrency());
NumTask = NumCpus - 1; //first one is this thread and is going to be blocked by the scheduler
Tasks = new Task[NumTask];
const auto numParallelWork = 32; //number of computations that can be done in parallel
const int numInvocations = 1e6; //number of invocations for time measurement
//create ComputeObjects array and compute start/end pointers
const auto computeObjects = new ComputeObject[numParallelWork];
const auto computeObjectsStart = computeObjects;
const auto computeObjectsEnd = computeObjects + numParallelWork;
//fill ComputeObjects with random data
for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
{
computeObject->A = float(rand()) / RAND_MAX;
computeObject->B = float(rand()) / RAND_MAX;
}
//set workers running
WorkersRunning = true;
//spawn workers
for (auto i = 0; i < NumTask; i++)
Tasks[i].WorkerThread = std::thread(TaskWorker, i);
//put this thread to first logical core
SetThreadAffinityMask(GetCurrentThread(), 1 << 0);
//wait 20s to allow windows to actually move the threads to the specified cores
//monitor task manager to ensure windows actually did that
Sleep(20000);
std::chrono::steady_clock::time_point start, end;
std::chrono::duration<double> elapsed;
start = std::chrono::steady_clock::now(); //start time measurement
//invoke task scheduler a few times
for (auto i = 0; i < numInvocations; i++)
TaskScheduler(computeObjects, numParallelWork);
end = std::chrono::steady_clock::now(); //end time measurement
elapsed = end - start;
std::cout << "parallel: " << elapsed.count() << "s" << std::endl;
//stop workers and wait for all threads
WorkersRunning = false;
for (auto i = 0; i < NumTask; i++) Tasks[i].WorkerThread.join();
//wait 10 seconds just for good measures
Sleep(10000);
start = std::chrono::steady_clock::now(); //start time measurement
//invoke sequential loop a few times
for (auto i = 0; i < numInvocations; i++)
for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
computeObject->Compute();
end = std::chrono::steady_clock::now(); //end time measurement
elapsed = end - start;
std::cout << "sequential: " << elapsed.count() << "s" << std::endl;
}