Позвольте мне подтвердить это,
Следующий код был написан с использованием C #, а не C. Вы можете связать идею с тем, что я пытаюсь сформулировать. И большая часть контента взята из Parallel Pattern (это был проект документа Microsoft о параллельном подходе)
Чтобы получить наилучшее статическое разбиение, вы должны иметь возможность заранее точно предсказать, сколько времени займет все итерации. Это редко выполнимо, что приводит к необходимости более динамичного разделения, когда система может быстро адаптироваться к изменяющимся рабочим нагрузкам. Мы можем решить эту проблему, перейдя на другой конец спектра компромиссов с максимально возможной балансировкой нагрузки.
Чтобы сделать это, вместо того, чтобы проталкивать каждому из потоков заданный набор индексов для обработки, мы можем заставить потоки конкурировать за итерации. Мы используем пул оставшихся итераций для обработки, который изначально заполняется всеми итерациями. Пока все итерации не будут обработаны, каждый поток переходит в пул итераций, удаляет значение итерации, обрабатывает его и затем повторяет. Таким образом, мы можем жадно достичь аппроксимации оптимального уровня возможной балансировки нагрузки (истинный оптимум может быть достигнут только при предварительном знании того, сколько времени займет каждая итерация). Если поток застревает, обрабатывая определенную длинную итерацию, другие потоки тем временем компенсируют его, обрабатывая работу из пула. Конечно, даже с этой схемой вы все еще можете найти далеко не оптимальное разбиение (которое может произойти, если один поток застрянет с несколькими кусками работы, значительно большими, чем остальные), но без знания того, сколько времени занимает обработка. данная часть работы потребует, еще мало что можно сделать.
Вот пример реализации, которая доводит балансировку нагрузки до этого предела. Пул значений итераций поддерживается как одно целое число, представляющее следующую доступную итерацию, и потоки, участвующие в обработке, «удаляют элементы», атомно увеличивая это целое число:
public static void MyParallelFor(
int inclusiveLowerBound, int exclusiveUpperBound, Action<int> body)
{
// Get the number of processors, initialize the number of remaining
// threads, and set the starting point for the iteration.
int numProcs = Environment.ProcessorCount;
int remainingWorkItems = numProcs;
int nextIteration = inclusiveLowerBound;
using (ManualResetEvent mre = new ManualResetEvent(false))
{
// Create each of the work items.
for (int p = 0; p < numProcs; p++)
{
ThreadPool.QueueUserWorkItem(delegate
{
int index;
while ((index = Interlocked.Increment(
ref nextIteration) - 1) < exclusiveUpperBound)
{
body(index);
}
if (Interlocked.Decrement(ref remainingWorkItems) == 0)
mre.Set();
});
}
// Wait for all threads to complete.
mre.WaitOne();
}
}