C ++ UNIX потоки - PullRequest
       26

C ++ UNIX потоки

1 голос
/ 02 декабря 2010

Я делаю проект с многопоточностью в UNIX и C ++. В основном есть ветка производителя и 5 веток потребителя. Поток производителя добавляет увеличивающиеся числа в очередь в произвольные моменты времени, а потоки потребителя опрашивают q, пытаясь удалить его. По какой-то причине мой q.size () продолжает падать, и я не могу понять, почему.

 #include <queue>
 #include <list>

 #include <stdio.h>
 #include <unistd.h>
 #include <stdlib.h>
 #include <string.h>
 #include <pthread.h>

 using namespace std;

 #define NUM_CONSUMER_THREADS 5
 #define NUM_PRODUCER_THREADS 1
 #define BUFFER_SIZE 20

 void *c_thread_function(void *arg);
 void *p_thread_function(void *arg);

 queue<int> q;

 int produce(int cur)
 {
  int temp = cur + 1;
  return temp;
 }

 void append(int num)
 {
  if ( q.size() < BUFFER_SIZE )
  {
   q.push(num);
  }
 }

 int take()
 {
  int removed = q.front();
  q.pop();
  sleep(1);
  return removed;
 }

 void consume(int num, int thread)
 {
  printf("%d consumed %d \n", thread, num);
 }


 int main() 
 {
  int result;

  pthread_t cthreads[NUM_CONSUMER_THREADS];
  pthread_t pthreads[NUM_PRODUCER_THREADS];

  void *thread_result;

  // build an array of consumer threads
  for(int num_of_cthreads = 0; num_of_cthreads < NUM_CONSUMER_THREADS; num_of_cthreads++) 
  {
   result = pthread_create(&(cthreads[num_of_cthreads]), NULL, c_thread_function, (void *)num_of_cthreads);
   if ( result != 0 )
   {
    perror( "Thread Creation Failed");
    exit(EXIT_FAILURE);
   }
   //sleep(1);  
  } 

  // build an array of producer threads
  for(int num_of_pthreads = 0; num_of_pthreads < NUM_PRODUCER_THREADS; num_of_pthreads++) 
  {
   result = pthread_create(&(pthreads[num_of_pthreads]), NULL, p_thread_function, NULL);
   if ( result != 0 )
   {
    perror( "Thread Creation Failed");
    exit(EXIT_FAILURE);
   }
   //sleep(1);  
  }

  printf("All threads created\n");
  while ( true )
  {
   // do nothing
  }
 }

 void *c_thread_function(void *arg)
 {
  int temp = (long)arg;
  printf("Consumer thread %d created \n", temp);

  while ( true )
  {
   while (  q.size() > 0 )
   {
    int w = take();
    consume(w, temp);
    printf(" q size is now %d \n", q.size());
   }
  }
 }

 void *p_thread_function(void *arg) 
 {
  printf("Producer thread created \n");

  int itemsAdded = 0;
  int temp;
  int sleepTime;

  while ( true ) 
  {
   while ( q.size() < BUFFER_SIZE )
   {
    temp = produce(itemsAdded);

    sleepTime = 1+(int)(9.0*rand()/(RAND_MAX+1.0));
    sleep(sleepTime);

    append(temp);

    printf("Producer adds: %d \n", temp);
    printf(" q size is now %d \n", q.size());

    itemsAdded++;
   }
  }
 }

Выход:

Производитель добавляет: 1

размер q теперь -1

0 потреблено 1

q размер теперь -2

1 потребляется 1

q размер теперь -3

3 потреблено 1

q размер теперь -4

4 потреблено 0

q размер теперь -5

0 потреблено 0

Ответы [ 5 ]

5 голосов
/ 02 декабря 2010

Вам необходимо узнать о понятии условия гонки и взаимное исключение .Ваш std::queue объект является общим ресурсом , что означает, что над ним работает более одного потока - потенциально одновременно .Это означает, что вы должны защитить его с помощью блокировок (известных как мьютексы), чтобы каждый доступ был синхронизирован.В противном случае вы получите так называемое условие гонки , где один поток изменяет данные, в то время как другой поток также осуществляет доступ к данным / их изменение, что приводит к несогласованному или поврежденному состоянию программы.Чтобы предотвратить состояние гонки, вам нужно заблокировать объект pthread_mutex перед каждым доступом к очереди.

Сначала вам нужно создать объект мьютекса и инициализировать его.

pthread_mutex mymutex;
pthread_mutex_init(&mymutex, 0);

Код вашего приложения должен выглядеть примерно так:

pthread_mutex_lock(&mymutex);

// Do something with queue

pthread_mutex_unlock(&mymutex);

Когда один потокполучает блокировку, никакой другой поток не может получить блокировку.Поток, который пытается получить блокировку, которая уже была получена другим потоком, просто будет ждать, пока блокировка не будет снята.Этот синхронизирует доступ к очереди, гарантируя, что только один поток изменяет его одновременно.

4 голосов
/ 02 декабря 2010
Контейнеры

STL, такие как queue, не являются поточно-ориентированными. Вам необходимо синхронизировать доступ к объекту очереди, например, с помощью мьютекса.

1 голос
/ 02 декабря 2010

Пара вопросов:

  • Вы заняты ожиданием.
    научиться использовать условные переменные. Таким образом ожидающие потоки не используют ресурсы.

  • int temp = (long)arg; не будет работать.
    Нет гарантии, когда будет запланирован запуск потока.
    Этот указатель arg указывает на переменную, которая могла измениться давным-давно.

  • Оба потока потребителя / производителя изменяют очередь q без получения монопольного доступа.
    Любой другой поток может изменить очередь между тестом по размеру и точкой, где вы добавляете вещи. Еще хуже то, что другой поток может одновременно попытаться изменить очередь (и я относительно уверен, что STL не безопасен для изменения потока).

Попробуйте что-то вроде этого:

#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>

#define CONSUMER_COUNT  5
#define PRODUCER_COUNT  2

Часть первая: все данные, необходимые для потока

struct ThreadQueue
{
    ThreadQueue()
        : finished(false)
    {
         if (pthread_mutex_init(&mutex, NULL) != 0)
         {  throw int(1);
         }
         if (pthread_cond_init(&cond, NULL) != 0)
         {
             // Technically we should wrap the mutext.
             // So if the condition variable fails it is
             // auto destroyed. This is left as an exercise.
             throw int(1);
         }
    }
    ~ThreadQueue()
    {
        if (pthread_cond_destroy(&cond) != 0)
        {   //throw int(1); // Do we really care?
        }
        if (pthread_mutex_destroy(&mutex) != 0)
        {   //throw int(1);
        }
    }
    std::queue<int>     data;
    pthread_mutex_t     mutex;
    pthread_cond_t      cond;
    bool                finished;
};

Поток потребителей

extern "C" void* consumerThread(void* arg)
{
    ThreadQueue&     que = *static_cast<ThreadQueue*>(arg);

    while(!que.finished)
    {
        // Get the lock before proceeding
        pthread_mutex_lock(&que.mutex);
        while(que.data.size() == 0)
        {
            // If there is no data in the que the sleep on the condition.
            pthread_cond_wait(&que.cond, &que.mutex);

            // We may have been released here because of a signal.
            // That does not mean we got out before one of the other
            // consumer threads already stoll the value from the queue.
            // So we must be in a loop and re-check the size() of the
            // que before we proceed. If the value was already stolen
            // then we go back to sleep waiting on the condition variable.

            if (que.finished)
                break;
        }

        // We have a lock with data in the que
        int value   = que.data.front();
        que.data.pop();

        // Use the same lock to access std::cout
        std::cout << "Consumer Got: " << value << "\n";
        pthread_mutex_unlock(&que.mutex);
    }
    return NULL;
}

Нить производителя

extern "C" void* producerThread(void* arg)
{
    ThreadQueue&     que = *static_cast<ThreadQueue*>(arg);

    while(!que.finished)
    {
        // Get the lock before proceeding
        pthread_mutex_lock(&que.mutex);

        // Add a new value to the queue
        int value = rand();
        que.data.push(value);

        // Ise the same lock to access std::cout
        std::cout << "Producer Push: " << value << "\n";

        // Signal a consumer to be released.
        pthread_cond_signal(&que.cond);

        // rand maintains internal state.
        // calls to rand() should therefore be protected by a mutex.
        // Again in this simple example we re-use the same mutex for protection
        int sleepTime = rand() % 5;

        // Now release the lock
        pthread_mutex_unlock(&que.mutex);
        sleep(sleepTime);
    }
    return NULL;
}

Основной цикл

int main()
{
    srand(time(NULL));

    ThreadQueue     queue;
    pthread_t       consumerThreads[CONSUMER_COUNT];
    pthread_t       producerThreads[PRODUCER_COUNT];

    try
    {
      for(int loop=0 ;loop < CONSUMER_COUNT; ++loop)
      {
          if (pthread_create(&consumerThreads[loop], NULL, consumerThread, &queue) != 0)
          {   throw int(2);
          }
      }
      for(int loop=0 ;loop < PRODUCER_COUNT; ++loop)
      {
          if (pthread_create(&producerThreads[loop], NULL, producerThread, &queue) != 0)
          {   throw int(3);
          }
      }
   }
   catch(...)
   {
       // Set the finished to true so all threads exit.
       queue.finished = true;
       // Some consumers may be waiting on the condition.
       // So wake them up one signal per consumer should do it.
       for(int loop = 0;loop < CONSUMER_COUNT; ++loop)
       {    pthread_cond_signal(&queue.cond);
       }
    }

    /* Wait for all threads to finish */
    for(int loop=0; loop < CONSUMER_COUNT; ++loop)
    {
        pthread_join(consumerThreads[loop], NULL);
    }
    for(int loop=0; loop < PRODUCER_COUNT; ++loop)
    {
        pthread_join(producerThreads[loop], NULL);
    }
};

Хоп, я правильно понял: -)

0 голосов
/ 02 декабря 2010

Я нашел проблему ...

цель задания состояла в том, чтобы показать, насколько ненадежной была многопоточность без семафоров. вот код, который мне нужно было исправить ...

int take()
{
 int removed = q.front();
 sleep(1); // switched
 q.pop();  // these two...
 return removed;
}

Я также удалил таймер сна из потока производителя. теперь все работает ...

вывод теперь делает это:

---Producer adds: 1 ---
---Producer adds: 2 ---
---Producer adds: 3 ---
---Producer adds: 4 ---
---Producer adds: 5 ---
---Producer adds: 6 ---
---Producer adds: 7 ---
---Producer adds: 8 ---
---Producer adds: 9 ---
---Producer adds: 10 ---
---Producer adds: 11 ---
---Producer adds: 12 ---
---Producer adds: 13 ---
---Producer adds: 14 ---
---Producer adds: 15 ---
---Producer adds: 16 ---
---Producer adds: 17 ---
---Producer adds: 18 ---
---Producer adds: 19 ---
---Producer adds: 20 ---
Thread 3 consumed 1 
Thread 1 consumed 1 
Thread 2 consumed 1 
Thread 4 consumed 1 
Thread 0 consumed 1 
---Producer adds: 21 ---
---Producer adds: 22 ---
---Producer adds: 23 ---
---Producer adds: 24 ---
---Producer adds: 25 ---
Thread 3 consumed 6 
Thread 4 consumed 6 
Thread 1 consumed 6 
---Producer adds: 26 ---
---Producer adds: 27 ---
---Producer adds: 28 ---
---Producer adds: 29 ---
---Producer adds: 30 ---
Thread 0 consumed 6 
Thread 2 consumed 6 
Thread 3 consumed 11 
Thread 4 consumed 11    
0 голосов
/ 02 декабря 2010
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...