Сохранение количества потоков постоянным с помощью pthread в C - PullRequest
2 голосов
/ 01 марта 2011

Я пытался найти решение, чтобы поддерживать постоянное число рабочих потоков под linux в C с помощью pthreads, но мне кажется, что я не могу полностью понять, что не так со следующим кодом:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define MAX_JOBS 50
#define MAX_THREADS 5

pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int jobs = MAX_JOBS;
int worker = 0;
int counter = 0;

void *functionC() {
  pthread_mutex_lock(&mutex1);
  worker++;
  counter++;
  printf("Counter value: %d\n",counter);
  pthread_mutex_unlock(&mutex1);

  // Do something...
  sleep(4);

  pthread_mutex_lock(&mutex1);
  jobs--;
  worker--;
  printf(" >>> Job done: %d\n",jobs);
  pthread_mutex_unlock(&mutex1);
}

int main(int argc, char *argv[]) {
  int i=0, j=0;
  pthread_t thread[MAX_JOBS];

  // Create threads if the number of working threads doesn't exceed MAX_THREADS
  while (1) {
    if (worker > MAX_THREADS) {
      printf(" +++ In queue: %d\n", worker);
      sleep(1);
    } else {
      //printf(" +++ Creating new thread: %d\n", worker);
      pthread_create(&thread[i], NULL, &functionC, NULL);
      //printf("%d",worker);
      i++;
    }
    if (i == MAX_JOBS) break;
  }

  // Wait all threads to finish
  for (j=0;j<MAX_JOBS;j++) {
    pthread_join(thread[j], NULL);
  }

  return(0);
}

Цикл while (1) продолжает создавать потоки, если количество рабочих потоков находится ниже определенного порога. Предполагается, что мьютекс блокирует критические секции каждый раз, когда глобальный счетчик рабочих потоков увеличивается (создание потоков) и уменьшается (работа выполнена). Я думал, что это может работать нормально и по большей части, но странные вещи случаются ...

Например, если я прокомментирую (как в этом фрагменте) printf // printf ("+++ Создание нового потока:% d \ n", рабочий); в то время как while (1), кажется, генерирует случайное число (18-25 в моем опыте) потоков (функция C выводит "Значение счетчика: от 1 до 18-25" ...) за раз вместо соблюдения условия IF внутри петля. Если я включаю printf, то кажется, что цикл ведет себя «почти» правильным образом ... Это, похоже, намекает на то, что отсутствует условие «мьютекса», которое я должен добавить в цикл в main () для эффективной блокировки потока, когда MAX_THREADS достигнут, но после изменения LOT раз этот код за последние несколько дней, я немного потерян, сейчас. Чего мне не хватает?

Пожалуйста, дайте мне знать, что я должен изменить, чтобы сохранить число потоков постоянным. Не похоже, что я слишком далек от решения ... Надеюсь ...: -)

Заранее спасибо!

1 Ответ

4 голосов
/ 01 марта 2011

Ваша проблема в том, что worker не увеличивается до тех пор, пока новый поток действительно не запустится и не начнет работать - тем временем основной поток зацикливается, проверяет workers, находит, что он не изменился, и запускает другой нить. Это может повторяться много раз, создавая слишком много потоков.

Итак, вам нужно увеличить worker в главном потоке, когда вы решили создать новый поток.

У вас есть другая проблема - вы должны использовать условные переменные, чтобы дать основному потоку спать, пока он не запустит другой поток, не используя цикл ожидания ожидания с sleep(1); в нем. Полный фиксированный код будет выглядеть так:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#define MAX_JOBS 50
#define MAX_THREADS 5

pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond1 = PTHREAD_COND_INITIALIZER;
int jobs = MAX_JOBS;
int workers = 0;
int counter = 0;

void *functionC() {
  pthread_mutex_lock(&mutex1);
  counter++;
  printf("Counter value: %d\n",counter);
  pthread_mutex_unlock(&mutex1);

  // Do something...
  sleep(4);

  pthread_mutex_lock(&mutex1);
  jobs--;
  printf(" >>> Job done: %d\n",jobs);

  /* Worker is about to exit, so decrement count and wakeup main thread */
  workers--;
  pthread_cond_signal(&cond1);

  pthread_mutex_unlock(&mutex1);
  return NULL;
}

int main(int argc, char *argv[]) {
  int i=0, j=0;
  pthread_t thread[MAX_JOBS];

  // Create threads if the number of working threads doesn't exceed MAX_THREADS
  while (i < MAX_JOBS) {
    /* Block on condition variable until there are insufficient workers running */
    pthread_mutex_lock(&mutex1);
    while (workers >= MAX_THREADS)
        pthread_cond_wait(&cond1, &mutex1);

    /* Another worker will be running shortly */
    workers++;
    pthread_mutex_unlock(&mutex1);

    pthread_create(&thread[i], NULL, &functionC, NULL);
    i++;
  }

  // Wait all threads to finish
  for (j=0;j<MAX_JOBS;j++) {
    pthread_join(thread[j], NULL);
  }

  return(0);
}

Обратите внимание, что, хотя это работает, это не идеально - лучше всего создать количество потоков, которое вы хотите, заранее, и сделать так, чтобы они зацикливались, ожидая работы. Это связано с тем, что создание и удаление потоков сопряжено со значительными накладными расходами и часто упрощает управление ресурсами. Версия вашего кода, переписанная для такой работы, будет выглядеть так:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#define MAX_JOBS 50
#define MAX_THREADS 5

pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int jobs = MAX_JOBS;
int counter = 0;

void *functionC()
{
  int running_job;

  pthread_mutex_lock(&mutex1);
  counter++;
  printf("Counter value: %d\n",counter);

  while (jobs > 0) {
    running_job = jobs--;
    pthread_mutex_unlock(&mutex1);

    printf(" >>> Job starting: %d\n", running_job);

    // Do something...
    sleep(4);

    printf(" >>> Job done: %d\n", running_job);

    pthread_mutex_lock(&mutex1);
  }
  pthread_mutex_unlock(&mutex1);

  return NULL;
}

int main(int argc, char *argv[]) {
  int i;
  pthread_t thread[MAX_THREADS];

  for (i = 0; i < MAX_THREADS; i++)
    pthread_create(&thread[i], NULL, &functionC, NULL);

  // Wait all threads to finish
  for (i = 0; i < MAX_THREADS; i++)
    pthread_join(thread[i], NULL);

  return 0;
}
...