Пара вопросов:
Вы заняты ожиданием.
научиться использовать условные переменные. Таким образом ожидающие потоки не используют ресурсы.
int temp = (long)arg;
не будет работать.
Нет гарантии, когда будет запланирован запуск потока.
Этот указатель arg указывает на переменную, которая могла измениться давным-давно.
Оба потока потребителя / производителя изменяют очередь q
без получения монопольного доступа.
Любой другой поток может изменить очередь между тестом по размеру и точкой, где вы добавляете вещи. Еще хуже то, что другой поток может одновременно попытаться изменить очередь (и я относительно уверен, что STL не безопасен для изменения потока).
Попробуйте что-то вроде этого:
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#define CONSUMER_COUNT 5
#define PRODUCER_COUNT 2
Часть первая: все данные, необходимые для потока
struct ThreadQueue
{
ThreadQueue()
: finished(false)
{
if (pthread_mutex_init(&mutex, NULL) != 0)
{ throw int(1);
}
if (pthread_cond_init(&cond, NULL) != 0)
{
// Technically we should wrap the mutext.
// So if the condition variable fails it is
// auto destroyed. This is left as an exercise.
throw int(1);
}
}
~ThreadQueue()
{
if (pthread_cond_destroy(&cond) != 0)
{ //throw int(1); // Do we really care?
}
if (pthread_mutex_destroy(&mutex) != 0)
{ //throw int(1);
}
}
std::queue<int> data;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool finished;
};
Поток потребителей
extern "C" void* consumerThread(void* arg)
{
ThreadQueue& que = *static_cast<ThreadQueue*>(arg);
while(!que.finished)
{
// Get the lock before proceeding
pthread_mutex_lock(&que.mutex);
while(que.data.size() == 0)
{
// If there is no data in the que the sleep on the condition.
pthread_cond_wait(&que.cond, &que.mutex);
// We may have been released here because of a signal.
// That does not mean we got out before one of the other
// consumer threads already stoll the value from the queue.
// So we must be in a loop and re-check the size() of the
// que before we proceed. If the value was already stolen
// then we go back to sleep waiting on the condition variable.
if (que.finished)
break;
}
// We have a lock with data in the que
int value = que.data.front();
que.data.pop();
// Use the same lock to access std::cout
std::cout << "Consumer Got: " << value << "\n";
pthread_mutex_unlock(&que.mutex);
}
return NULL;
}
Нить производителя
extern "C" void* producerThread(void* arg)
{
ThreadQueue& que = *static_cast<ThreadQueue*>(arg);
while(!que.finished)
{
// Get the lock before proceeding
pthread_mutex_lock(&que.mutex);
// Add a new value to the queue
int value = rand();
que.data.push(value);
// Ise the same lock to access std::cout
std::cout << "Producer Push: " << value << "\n";
// Signal a consumer to be released.
pthread_cond_signal(&que.cond);
// rand maintains internal state.
// calls to rand() should therefore be protected by a mutex.
// Again in this simple example we re-use the same mutex for protection
int sleepTime = rand() % 5;
// Now release the lock
pthread_mutex_unlock(&que.mutex);
sleep(sleepTime);
}
return NULL;
}
Основной цикл
int main()
{
srand(time(NULL));
ThreadQueue queue;
pthread_t consumerThreads[CONSUMER_COUNT];
pthread_t producerThreads[PRODUCER_COUNT];
try
{
for(int loop=0 ;loop < CONSUMER_COUNT; ++loop)
{
if (pthread_create(&consumerThreads[loop], NULL, consumerThread, &queue) != 0)
{ throw int(2);
}
}
for(int loop=0 ;loop < PRODUCER_COUNT; ++loop)
{
if (pthread_create(&producerThreads[loop], NULL, producerThread, &queue) != 0)
{ throw int(3);
}
}
}
catch(...)
{
// Set the finished to true so all threads exit.
queue.finished = true;
// Some consumers may be waiting on the condition.
// So wake them up one signal per consumer should do it.
for(int loop = 0;loop < CONSUMER_COUNT; ++loop)
{ pthread_cond_signal(&queue.cond);
}
}
/* Wait for all threads to finish */
for(int loop=0; loop < CONSUMER_COUNT; ++loop)
{
pthread_join(consumerThreads[loop], NULL);
}
for(int loop=0; loop < PRODUCER_COUNT; ++loop)
{
pthread_join(producerThreads[loop], NULL);
}
};
Хоп, я правильно понял: -)