C ++ 11: Шаблон «Издатель / Потребитель» не завершается, если издатель не спит - PullRequest
0 голосов
/ 06 июня 2018

В C ++ я пытаюсь получить дескриптор для шаблона издатель / потребитель, используя condition_variable.Это в общих чертах шаблон, который я видел в Интернете:

#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <chrono>

using namespace std;

mutex m;
queue<string> que;
condition_variable cond;

void write(string &&msg) {
    unique_lock<mutex> locker(m);
    que.push(msg);
    locker.unlock();
    cond.notify_one();
    this_thread::sleep_for(chrono::milliseconds(1));
}

void read() {
    while (true) {
        unique_lock<mutex> locker(m);
        cond.wait(locker);
        if (!que.empty()) {
            cout << que.front() << endl;
            que.pop();
        }
        locker.unlock();
    }
}

void publisher(string &msg) {
    for (int i = 0; i < 100; ++i)
        write("Publisher: " + msg + ", " + to_string(i));
}

int main() {

    string msg = "Hello";
    thread pub_thread(publisher, std::ref(msg));

    /* The main thread will print the publisher's messages to cout. */
    read();

    /* Make the main thread wait for the publisher to finish. */
    pub_thread.join(); //
    return 0;
}

Я не понимаю, что такое вызов sleep_for в ветке издателя.Я понимаю, что это просто для имитации сценария "реальной жизни", при котором издатель не будет так быстро выплевывать сообщения.Однако, как ни странно, если я закомментирую эту строку, код не будет завершен.Это почему?

Кроме того, я попытался установить время сна для 0 равным 0 с тем же эффектом.Кажется, что опубликованным принципиально нужно спать, но я не понимаю почему.Чтобы получить более конкретный код должен распечатать 100 сообщений.Если я оставлю код в спящем режиме на 1 мс, то будут напечатаны все 100 сообщений.Если нет, то я вижу только около 10 сообщений, прежде чем код зависнет.Кажется, что возникла тупиковая ситуация.

Бонусные баллы, если существует лучший шаблон, позволяющий избежать необходимости усыплять издателя ...

Я знаю, что на практике вам нужна стратегия остановки основного потока,как таблетка с ядом.Я намеренно пропустил это, чтобы сосредоточиться на настоящем обсуждении.

РЕДАКТИРОВАТЬ

Хммм.Если я вставлю блок для обработки ложного пробуждения, то это решит проблему.Но это все еще не объясняет, почему оригинальный код не удался.

Вот улучшенная функция чтения:

void read() {
    while (true) {
        unique_lock<mutex> locker(m);
        cond.wait(locker, [&](){ return !que.empty(); });
        cout << que.front() << endl;
        que.pop();
        locker.unlock();
    }
}

Ответы [ 2 ]

0 голосов
/ 06 июня 2018

У вас есть две проблемы: во-первых, издатель не обязан уступать читателю - или делать паузу достаточно долго, чтобы читатель мог успешно заблокировать мьютекс - если вы не сделаете это.

во-вторых, ваш читатель в любом случае неверен:

void read() {
    while (true) {
        unique_lock<mutex> locker(m);
        cond.wait(locker);
        if (!que.empty()) {
            cout << que.front() << endl;
            que.pop();
        }
    }
}

Это предполагает, что вы получаете одно «событие пробуждения» на элемент, помещенный в очередь, поскольку оно потребляет ровно один элемент за пробуждение.Но это не , как работают условные переменные.

Вполне возможно, что эта последовательность произойдет:

  1. издатель добавляет элемент 1
    • сигналы издателяcondvar
  2. издатель добавляет элемент 2
    • сигналы издателя condvar
  3. читатель просыпается от condvar wait
    • считывательпотребляет предмет 1

В этом случае издатель добавит 100 предметов и подаст сигнал 100 раз, но читатель проснется только 99 раз и, следовательно, потребит не более 99 предметов.

Правильный код должен выглядеть примерно так:

void read() {
    unique_lock<mutex> locker(m);
    while (true) {
        // don't wait if we don't have to
        while (que.empty()) {
            cond.wait(locker);
        }
        // consume everything we can
        while (!que.empty()) {
            cout << que.front() << endl;
            que.pop();
        }
    }
}

Использование предиката дает примерно то же самое (я просто для ясности выписал всю логику) - второй while isnВ вашем редактировании нет, но циклический обход и пропуск первого пока немного более дорогой способ получить то же поведение.

Кроме того, нет необходимости перебивать мьютекс на каждой итерации - переменная условия already (un) блокирует его по мере необходимости.

0 голосов
/ 06 июня 2018

Вам необходимо справиться с ситуацией, когда издатель публикует быстрее, чем потребитель потребляет.

Когда это произойдет, потребитель пропустит триггеры condition_variable.Помните, что notify вызовы не накапливаются.

Измените потребителя, чтобы он использовал все доступные сообщения после его пробуждения:

if (!que.empty())while (!que.empty())

Вот так:

void read() {
    while (true) {
        unique_lock<mutex> locker(m);
        cond.wait(locker);
        while (!que.empty()) {
            cout << que.front() << endl;
            que.pop();
        }
        locker.unlock();
    }
}
...