Я использую следующую реализацию очереди блокировки, но я захожу в тупик при попытке доступа к push()
из двух разных потоков.Я имею опыт работы с Java, поэтому мой опыт работы с потоками C ++ ограничен.
У меня нет буста, и я бы хотел использовать STL, поэтому я изменил ссылки на буст с помощью их аналога STL.
template<typename D>
class blocking_queue {
private:
std::queue<D> queue;
mutable std::mutex queue_mutex;
const size_t queue_limit;
bool is_closed = false;
std::condition_variable new_item_or_closed_event;
std::condition_variable item_removed_event;
#ifndef NDEBUG
size_t pushes_in_progress = 0;
#endif
public:
blocking_queue(size_t size_limit = 0) : queue_limit(size_limit) {}
void push(const D& data) {
std::lock_guard lock(queue_mutex);
#ifndef NDEBUG
++pushes_in_progress;
#endif
if (queue_limit > 0) {
while (queue.size() >= queue_limit) {
item_removed_event.wait(lock);
}
}
assert(!is_closed);
queue.push(data);
#ifndef NDEBUG
--pushes_in_progress;
#endif
lock.unlock();
new_item_or_closed_event.notify_one();
}
bool try_push(const D& data) {
std::unique_lock lock(queue_mutex);
if (queue_limit > 0) {
if (queue.size() >= queue_limit) {
return false;
}
}
assert(!is_closed);
queue.push(data);
lock.unlock();
new_item_or_closed_event.notify_one();
return true;
}
void close() {
std::unique_lock lock(queue_mutex);
assert(!is_closed);
#ifndef NDEBUG
assert(pushes_in_progress == 0);
#endif
is_closed = true;
lock.unlock();
new_item_or_closed_event.notify_all();
}
D front() {
if (queue.empty()) {
return {};
}
return queue.front();
}
bool pop(D & popped_value) {
std::unique_lock lock(queue_mutex);
while (queue.empty()) {
if (is_closed) {
return false;
}
new_item_or_closed_event.wait(lock);
}
popped_value = queue.front();
queue.pop();
item_removed_event.notify_one();
return true;
}
bool try_pop(D & popped_value) {
std::unique_lock lock(queue_mutex);
if (queue.empty()) {
return false;
}
popped_value = queue.front();
queue.pop();
item_removed_event.notify_one();
return true;
}
bool empty() const {
std::unique_lock lock(queue_mutex);
return queue.empty();
}
bool closed() const {
std::unique_lock lock(queue_mutex);
return is_closed;
}
size_t limit() const {
return queue_limit;
}
size_t size() const
{
std::unique_lock lock(queue_mutex);
return queue.size();
}
};
Повторное использование одного и того же мьютекса для двух разных потоков - моя проблема?Я немного запутался в том, как процессор знает, какой из потоков должен быть сигнализирован первым.