Я пытаюсь написать простую потокобезопасную очередь блокировки для своего приложения.По какой-то причине в моем демонстрационном приложении не все потоки просыпаются, когда выполняется вызов close()
.Мой потребительский поток просто закрывается, но два других потока производителя никогда не получают сигнал.Возможно, состояние гонки?
Это мое демонстрационное приложение:
#include <iostream>
#include <string>
#include <sstream>
#include <mutex>
#include "blocking_queue.h"
rl::blocking_queue<std::string> queue(3);
std::string random_string(size_t);
int main()
{
std::thread consumer([]() {
std::string message;
while (queue.pop(message)) {
std::cout << "--> async_write(...) called with:" << message << std::endl;
std::thread popper([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << "--> async_write(...) completed, queue size is now " << queue.size() << std::endl;
queue.unblock();
});
popper.detach();
if (!queue.block()) {
break;
}
}
std::cout << "consumer thread closed" << std::endl;
});
std::thread producer([]() {
while (!queue.closed()) {
std::string id = random_string(6);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::cout << id << ": Pushing JSON" << std::endl;
queue.push("a message");
std::cout << id << ": Pushed JSON" << std::endl;
}
std::cout << "producer closed" << std::endl;
});
std::thread producer2([]() {
while (!queue.closed()) {
std::string id = random_string(6);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::cout << id << ": Pushing heartbeat" << std::endl;
queue.push("a heartbeat");
std::cout << id << ": Pushed heartbeat" << std::endl;
}
std::cout << "producer2 closed" << std::endl;
});
std::thread cancel([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
std::cout << "%%%% CLOSING QUEUE %%%%" << std::endl;
queue.close();
std::cout << "%%%% QUEUE CLOSED %%%%" << std::endl;
});
cancel.join();
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
std::cout << "Hello World!\n";
return EXIT_SUCCESS;
}
std::string random_string(size_t length)
{
auto randchar = []() -> char
{
const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
const size_t max_index = (sizeof(charset) - 1);
return charset[rand() % max_index];
};
std::string str(length, 0);
std::generate_n(str.begin(), length, randchar);
return str;
}
Это моя очередь блокировки:
#include <queue>
#include <mutex>
#include <condition_variable>
#include <assert.h>
// Based on https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
namespace rl {
template<typename T>
class blocking_queue {
private:
std::queue<T> queue;
const size_t queue_limit;
bool is_closed = false;
mutable std::mutex queue_mutex;
std::condition_variable new_item_or_closed_event;
std::condition_variable item_removed_event;
std::condition_variable queue_blocked_event;
#ifndef NDEBUG
size_t pushes_in_progress = 0;
#endif
public:
blocking_queue(size_t size_limit = 0) : queue_limit(size_limit) {}
void push(const T& data) {
std::unique_lock<std::mutex> lock(queue_mutex);
#ifndef NDEBUG
++pushes_in_progress;
#endif
if (queue_limit > 0) {
while (queue.size() >= queue_limit) {
item_removed_event.wait(lock);
}
}
assert(!is_closed);
queue.push(data);
#ifndef NDEBUG
--pushes_in_progress;
#endif
lock.unlock();
new_item_or_closed_event.notify_one();
}
bool try_push(const T& data) {
std::unique_lock<std::mutex> lock(queue_mutex);
if (queue_limit > 0) {
if (queue.size() >= queue_limit) {
return false;
}
}
assert(!is_closed);
queue.push(data);
lock.unlock();
new_item_or_closed_event.notify_one();
return true;
}
void close() {
std::unique_lock<std::mutex> lock(queue_mutex);
assert(!is_closed);
#ifndef NDEBUG
assert(pushes_in_progress == 0);
#endif
is_closed = true;
lock.unlock();
//item_removed_event.notify_all();
//queue_blocked_event.notify_all();
new_item_or_closed_event.notify_all();
}
void open() {
is_closed = false;
}
bool block() {
std::unique_lock<std::mutex> lock(queue_mutex);
queue_blocked_event.wait(lock);
return !is_closed;
}
void unblock() {
queue_blocked_event.notify_one();
}
bool pop(T & popped_value) {
std::unique_lock<std::mutex> lock(queue_mutex);
while (queue.empty()) {
if (is_closed) {
return false;
}
new_item_or_closed_event.wait(lock);
}
popped_value = queue.front();
queue.pop();
item_removed_event.notify_one();
return true;
}
bool try_pop(T & popped_value) {
std::unique_lock<std::mutex> lock(queue_mutex);
if (queue.empty()) {
return false;
}
popped_value = queue.front();
queue.pop();
item_removed_event.notify_one();
return true;
}
bool empty() const {
std::unique_lock<std::mutex> lock(queue_mutex);
return queue.empty();
}
bool closed() const {
std::unique_lock<std::mutex> lock(queue_mutex);
return is_closed;
}
size_t limit() const {
return queue_limit;
}
size_t size() const {
std::unique_lock<std::mutex> lock(queue_mutex);
return queue.size();
}
};
}