- Доступ к общим переменным должен выполняться только при удерживании
мьютекс, который защищает его
condition_variable::wait
должен проверить состояние.
- Условие должно быть общей переменной, защищенной мьютексом, который вы передаете
condition_variable::wait
.
- Чтобы проверить условие, нужно обернуть вызов к
wait
в цикле while или использовать перегрузку с двумя аргументами wait
(что
эквивалентно версии цикла while)
Примечание. Эти правила не являются строго необходимыми, если вы действительно понимаете, что делает оборудование. Однако эти проблемы быстро усложняются при использовании простых структур данных, и будет легче доказать, что ваш алгоритм работает правильно, если вы будете следовать им.
Ваши Q
и Q_buf
являются общими переменными. Из-за правила 1 я бы предпочел, чтобы они были объявлены в качестве локальных переменных в функции, которая их использует (consume()
и produce()
соответственно). Будет 1 общий буфер, который будет защищен мьютексом. Производитель добавит в свой локальный буфер. Когда этот буфер заполнен, он получает мьютекс и помещает локальный буфер в общий буфер. Затем он ждет, пока потребитель не примет этот буфер, прежде чем выдать больше данных.
Потребитель ожидает, пока этот общий буфер «прибудет», затем он получает мьютекс и заменяет свой пустой локальный буфер общим буфером. Затем он сообщает производителю, что буфер был принят, поэтому он знает, что начать производство снова.
Семантически, я не вижу смысла использовать swap
вместо move
, так как в любом случае один из контейнеров в любом случае пуст. Может быть, вы хотите использовать swap
, потому что вы знаете что-то о базовой памяти. Вы можете использовать все, что захотите, и это будет быстро и будет работать одинаково (по крайней мере, алгоритмически).
Эту проблему можно решить с помощью 1 условной переменной, но вам будет немного легче подумать, если вы используете 2.
Вот что я придумала. Протестировано на Visual Studio 2017 (15.6.7) и GCC 5.4.0. Мне не нужно зачислять или что-то еще (это такая простая штука), но по закону я должен сказать, что я не даю никаких гарантий.
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <chrono>
std::vector<int> g_deliveryBuffer;
bool g_quit = false;
std::mutex g_mutex; // protects g_deliveryBuffer and g_quit
std::condition_variable g_producerDeliver;
std::condition_variable g_consumerAccepted;
// consumer
void consume()
{
// local buffer
std::vector<int> consumerBuffer;
while (true)
{
if (consumerBuffer.empty())
{
std::unique_lock<std::mutex> lock(g_mutex);
while (g_deliveryBuffer.empty() && !g_quit) // if we beat the producer, wait for them to push to the deliverybuffer
g_producerDeliver.wait(lock);
if (g_quit)
break;
consumerBuffer = std::move(g_deliveryBuffer); // get the buffer
}
g_consumerAccepted.notify_one(); // notify the producer that the buffer has been accepted
// for-loop to process the elems in Q
// ...
consumerBuffer.clear();
// ...
}
}
// producer
void produce()
{
std::vector<int> producerBuffer;
while (true)
{
// for-loop to fill up Q_buf
// ...
producerBuffer = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
// ...
// once Q_buf is fully filled, wait until consumer asks to give it a full Q
{ // scope is for lock
std::unique_lock<std::mutex> lock(g_mutex);
g_deliveryBuffer = std::move(producerBuffer); // ok to push to deliverybuffer. it is guaranteed to be empty
g_producerDeliver.notify_one();
while (!g_deliveryBuffer.empty() && !g_quit)
g_consumerAccepted.wait(lock); // wait for consumer to signal for more data
if (g_quit)
break;
// We will never reach this point if the buffer is not empty.
}
}
}
int main()
{
// spawn threads
std::thread consumerThread(consume);
std::thread producerThread(produce);
// for for 5 seconds
std::this_thread::sleep_for(std::chrono::seconds(5));
// signal that it's time to quit
{
std::lock_guard<std::mutex> lock(g_mutex);
g_quit = true;
}
// one of the threads may be sleeping
g_consumerAccepted.notify_one();
g_producerDeliver.notify_one();
consumerThread.join();
producerThread.join();
return 0;
}