Пробуждение отдельных потоков вместо занятого ожидания в pthreads - PullRequest
4 голосов
/ 28 февраля 2012

Я не уверен, отражает ли заголовок то, что я здесь спрашиваю, но это лучшее, что я могу сделать без очень длинного заголовка.Я пытаюсь реализовать модель worker thread в pthreads.Я хочу порождать набор потоков из функции main, и после этого поток main делегирует задание работнику и ожидает завершения всех потоков, прежде чем назначить им следующее задание (На самом деле, требуется организовать потокв блоке, очень похожем на модель программирования CUDA, но на CPU. Хотя это не относится к текущему вопросу).Массив job используется для указания типа задания для каждого потока.В настоящее время я реализовал это с помощью семафоров, что требует напряженного ожидания.Я ищу способы сделать так, чтобы потоки переходили в спящий режим и просыпались только тогда, когда они необходимы, а не опрашивали непрерывно.

Функция, выполняемая каждым потоком

volatile int jobs[MAX_THREADS]; // global job indicator array
sem_t semaphore;                // semaphore to indicate completion
thread_execute(void *args)
{
  tid = get_id(args);
  while(jobs[tid] != -1)
  {
    if(jobs[tid] == 0) continue; // no job
    if(jobs[tid] == JOBS_1)
    {
      jobs1();
      jobs[tid] = 0; // go back to idle state
      sem_post(&semapahore);
    }
    if(jobs[tid] == JOBS_2)
    {
      jobs2();
      jobs[tid] = 0; // go back to idle state
      sem_post(&semapahore);
    }
  }

  pthread_exit(NULL);
}

Основная функция выглядит следующим образом:

int main()
{
  sem_init(&semaphore, 0, 0);
  jobs[0...MAX_THREADS] = 0;
  spawn_threads();

  // Dispatch first job
  jobs[0...MAX_THREADS] = JOBS_1;
  int semvalue = 0;
  while (semvalue < MAX_THREADS) // Wait till all threads increment the semaphore
    sem_getvalue(&sempaphore, &semvalue);

  sem_init(&semaphore, 0, 0); // Init semaphore back to 0 for the next job
                              // I'm actually using diff. semaphores for diff. jobs
  jobs[0...MAX_THREADS] = JOBS_2;
  while (semvalue < MAX_THREADS)
    sem_getvalue(&sempaphore, &semvalue);

  jobs[0...MAX_THREADS] = -1; // No more jobs
  pthread_join();
}

Проблема этой реализации состоит в том, что поток main занят, ожидая завершения всех рабочих потоков, и рабочие потоки также постоянно опрашивают массив заданий, чтобы проверить наличиеновая работа.Есть ли лучший способ сделать это, когда потоки засыпают и просыпаются, когда это необходимо, по аналогии с обработчиком сингалов и использованием pthread_kill(), но это немного запутанно с отдельным обработчиком сигналов.

1 Ответ

3 голосов
/ 28 февраля 2012

Вы можете использовать условную переменную , чтобы заставить потоки переходить в спящий режим до тех пор, пока они не получат сигнал.

volatile int jobs[MAX_THREADS]; // global job indicator array
pthread_cond_t th_cond;     // threads wait on this
pthread_mutex_t th_mutex;   // mutex to protect the signal
int busyThreads = MAX_THREADS;

pthread_cond_t m_cond;      // main thread waits on this
pthread_mutex_t m_mutex;    // mutex to protect main signal

thread_execute(void *args)
{
  tid = get_id(args);
  while(jobs[tid] != -1)
  {
    if(jobs[tid] == 0) continue; // no job
    if(jobs[tid] == JOBS_1)
    {
      jobs1();
      jobs[tid] = 0; // go back to idle state
      pthread_mutex_lock(&th_mutex);      
          pthread_mutex_lock(&m_mutex);   
          --busyThreads;                       // one less worker
          pthread_cond_signal(&m_cond);        // signal main to check progress
          pthread_mutex_unlock(&m_mutex);
      pthread_cond_wait(&th_cond, &th_mutex);   // wait for next job
      pthread_mutex_unlock(&th_mutex);      
    }
    if(jobs[tid] == JOBS_2)
    {
      jobs2();
      jobs[tid] = 0; // go back to idle state
      pthread_mutex_lock(&th_mutex);
      --busyThreads;
      pthread_cond_wait(&th_cond, &th_mutex);
      pthread_mutex_unlock(&th_mutex);
    }
  }

  pthread_exit(NULL);
}

тогда в основном:

int main()
{
  sem_init(&semaphore, 0, 0);
  jobs[0...MAX_THREADS] = 0;
  spawn_threads();

  // Dispatch first job
  jobs[0...MAX_THREADS] = JOBS_1;
  int semvalue = 0;

  pthread_mutex_lock(&m_mutex);
  while(busyThreads > 0)        // check number of active workers
      pthread_cond_wait(&m_cond, &m_mutex);   
  pthread_mutex_unlock(&m_mutex);

  busyThreads = MAX_THREADS;
  pthread_mutex_lock(&th_mutex);
  pthread_cond_broadcast(&th_cond);   // signal all workers to resume
  pthread_mutex_unlock(&th_mutex);

  // same for JOBS_2;

  jobs[0...MAX_THREADS] = -1; // No more jobs
  pthread_join();
}
...