C ++ Pthreads: Алгоритм для одновременного запуска N потоков, когда> N потоков должно выполняться на каждой итерации - PullRequest
1 голос
/ 01 ноября 2011

У меня есть программа, которая должна запускать функцию M раз за итерацию, и эти запуски могут быть распараллелены. Допустим, я ограничен одновременным запуском N потоков (скажем, по количеству доступных ядер). Мне нужен алгоритм, который будет гарантировать, что я всегда запускаю N потоков (при условии, что число оставшихся потоков>> N), и этот алгоритм должен быть инвариантным к порядку завершения этих потоков. Кроме того, алгоритм планирования потоков не должен требовать значительного времени ЦП.

Я имею в виду что-то вроде следующего, но это явно некорректно.

#include <iostream>
#include <pthread.h>
#include <cstdlib>

void *find_num(void* arg)
{
    double num = rand();
    for(double q=0; 1; q++)
        if(num == q)
        {
            std::cout << "\n--";
            return 0;
        }
}


int main ()
{
    srand(0);

    const int N = 2;
    pthread_t threads [N];
    for(int q=0; q<N; q++)
        pthread_create(&threads [q], NULL, find_num, NULL);

    int M = 30;
    int launched=N;
    int finnished=0;
    while(1)
    {
        for(int w=0; w<N; w++)
        {
            //inefficient if `threads [1]` done before `threads [2]`
            pthread_join( threads [w], NULL);
            finnished++;
            std::cout << "\n" << finnished;
            if(finnished == M)
                break;
            if(launched < M)
            {
                pthread_create(&threads [w], NULL, find_num, NULL);
                launched++;
            }
        }

        if(finnished == M)
            break;
    }
}

Очевидная проблема здесь в том, что если threads[1] завершает работу до threads[0], то теряется процессорное время, и я не могу придумать, как это обойти. Кроме того, я предполагаю, что ожидание основной процедуры на pthread_join() не является значительным расходом процессорного времени?

Ответы [ 4 ]

5 голосов
/ 01 ноября 2011

Я бы посоветовал не возродить темы, это довольно серьезные накладные расходы.Вместо этого создайте пул из N потоков и отправьте им работу через рабочую очередь - довольно стандартный подход.Даже если ваша оставшаяся работа меньше N, дополнительные потоки не принесут никакого вреда, они просто останутся заблокированными в рабочей очереди.

Если вы настаиваете на своем текущем подходе, вы можете сделать это следующим образом.:

Не ждите потоков с pthread_join, вам это не нужно, поскольку вы ничего не возвращаете в основной поток.Создайте потоки с атрибутом PTHREAD_CREATE_DETACHED и просто дайте им выйти.

В главном потоке подождите семафор, который сигнализируется каждым выходящим потоком - фактически вы ожидаете any завершение потока.Если по какой-либо причине у вас нет <semaphore.h>, реализовать его с мьютексами и условиями тривиально.

#include <semaphore.h>
#include <iostream>
#include <pthread.h>
#include <cstdlib>

sem_t exit_sem;

void *find_num(void* arg)
{
    double num = rand();
    for(double q=0; 1; q++)
        if(num == q)
        {
            std::cout << "\n--";
            return 0;
        }

    /* Tell the main thread we have exited.  */
    sem_post (&exit_sem);
    return NULL;
}

int main ()
{
    srand(0);

    /* Initialize pocess private semaphore with 0 initial count.  */
    sem_init (&exit_sem, 0, 0);
    const int N = 2;

    pthread_attr_t attr;
    pthread_attr_init (&attr);
    pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
    for(int q=0; q<N; q++)
        pthread_create(NULL, &attr, find_num, NULL);

    int M = 30;
    int launched=N;
    int finnished=0;
    while(1)
    {
        for(int w=0; w<N; w++)
        {
            /* Wait for any thread to exit, don't care which.  */
            sem_wait (&exit_sem);

            finnished++;
            std::cout << "\n" << finnished;
            if(finnished == M)
                break;
            if(launched < M)
            {
                pthread_create(NULL, &attr, find_num, NULL);
                launched++;
            }
        }

        if(finnished == M)
            break;
    }
}

В любом случае, я бы снова рекомендовал подход с использованием пула потоков / очереди работ.

1 голос
/ 01 ноября 2011

Я бы определенно посмотрел на OpenMP или C ++ 11 async.Если честно, на данный момент я думаю, что OpenMP более жизнеспособен.

OpenMP

Вот краткий пример, который иногда находит правильный ответ (42) случайным образом, используя 2 темы.

Обратите внимание, что если вы пропустите включение omp.h и вызов omp_set_num_threads(2);, вы получите собственное количество потоков (т.е. в зависимости от количества ядер, доступных во время выполнения).OpenMP также позволяет вам динамически настраивать этот номер, устанавливая переменную окружения, например, OMP_NUM_THREADS=16.В самом деле, вы можете динамически отключить параллелизм в целом:)

Я даже добавил пример параметра потока и накопление результата - это обычно, когда вещи становятся немного интереснее, чем просто начать работуи забыв об этом.Опять же, это может быть излишним для вашего вопроса:)

Скомпилировано с g++ -fopenmp test.cpp -o test

#include <iostream>
#include <cstdlib>
#include <omp.h>

int find_num(int w)
{
    return rand() % 100;
}

int main ()
{
    srand(time(0));

    omp_set_num_threads(2); // optional! leave it out to get native number of threads

    bool found = false;

#pragma omp parallel for reduction (|:found)
    for(int w=0; w<30; w++)
    {
        if (!found) 
        {
             found = (find_num(w) == 42);
        }
    }

    std::cout << "number was found: " << (found? "yes":"no") << std::endl;
}
1 голос
/ 01 ноября 2011

Простое решение состоит в том, чтобы иметь глобальную переменную, которая устанавливается, когда поток завершен, и основной цикл опрашивает эту переменную, чтобы проверить, когда шаг сделан, и только pthread_join затем.

1 голос
/ 01 ноября 2011

Если main () ожидает pthread_join, то (при условии, что на вашей платформе это не реализовано как просто спин-блокировка), это не вызовет нагрузки на процессор; Если pthread_join ожидает мьютекса, то планировщик не будет предоставлять этот поток в любое время, пока этот мьютекс не будет сигнализирован.

Если N действительно число ядер, возможно, вам следует забыть об управлении расписанием потоков самостоятельно; Планировщик ОС позаботится об этом. Если N меньше, чем число или число ядер, возможно, вы можете установить соответствие потоков для запуска вашего процесса только на N ядрах (или запустить процесс вычисления, который делает это, если вы не хотите устанавливать соответствие потоков для остальной части процесса ); Опять же, смысл этого заключается в том, чтобы позволить планировщику ОС заниматься планированием.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...