синхронизация потока с использованием мьютекса и условной переменной - PullRequest
0 голосов
/ 07 мая 2018

Я пытаюсь реализовать многопоточное задание, производителя и потребителя, и в основном я хочу, чтобы, когда потребитель заканчивает данные, он уведомляет производителя, чтобы производитель предоставил новые данные.

Сложная часть, в моем текущем значении, производитель и потребитель уведомляют друг друга и ждут друг друга, я не знаю, как правильно реализовать эту часть.

Например, см. Код ниже,

mutex m;
condition_variable cv;

vector<int> Q;  // this is the queue the consumer will consume
vector<int> Q_buf;  // this is a buffer Q into which producer will fill new data directly

// consumer
void consume() {
  while (1) {
    if (Q.size() == 0) {  // when consumer finishes data
      unique_lock<mutex> lk(m);
      // how to notify producer to fill up the Q?
      ...
      cv.wait(lk);
    }

    // for-loop to process the elems in Q
    ...
  }
}

// producer
void produce() {
  while (1) {
    // for-loop to fill up Q_buf
    ...

    // once Q_buf is fully filled, wait until consumer asks to give it a full Q
    unique_lock<mutex> lk(m);
    cv.wait(lk);
    Q.swap(Q_buf);  // replace the empty Q with the full Q_buf
    cv.notify_one();
  }
}

Я не уверен, что приведенный выше код с использованием mutex и condition_variable является правильным способом реализации моей идеи, пожалуйста, дайте мне совет!

Ответы [ 5 ]

0 голосов
/ 19 мая 2018
  1. Доступ к общим переменным должен выполняться только при удерживании мьютекс, который защищает его
  2. condition_variable::wait должен проверить состояние.
    1. Условие должно быть общей переменной, защищенной мьютексом, который вы передаете condition_variable::wait.
    2. Чтобы проверить условие, нужно обернуть вызов к wait в цикле while или использовать перегрузку с двумя аргументами wait (что эквивалентно версии цикла while)

Примечание. Эти правила не являются строго необходимыми, если вы действительно понимаете, что делает оборудование. Однако эти проблемы быстро усложняются при использовании простых структур данных, и будет легче доказать, что ваш алгоритм работает правильно, если вы будете следовать им.

Ваши Q и Q_buf являются общими переменными. Из-за правила 1 я бы предпочел, чтобы они были объявлены в качестве локальных переменных в функции, которая их использует (consume() и produce() соответственно). Будет 1 общий буфер, который будет защищен мьютексом. Производитель добавит в свой локальный буфер. Когда этот буфер заполнен, он получает мьютекс и помещает локальный буфер в общий буфер. Затем он ждет, пока потребитель не примет этот буфер, прежде чем выдать больше данных.

Потребитель ожидает, пока этот общий буфер «прибудет», затем он получает мьютекс и заменяет свой пустой локальный буфер общим буфером. Затем он сообщает производителю, что буфер был принят, поэтому он знает, что начать производство снова.

Семантически, я не вижу смысла использовать swap вместо move, так как в любом случае один из контейнеров в любом случае пуст. Может быть, вы хотите использовать swap, потому что вы знаете что-то о базовой памяти. Вы можете использовать все, что захотите, и это будет быстро и будет работать одинаково (по крайней мере, алгоритмически).

Эту проблему можно решить с помощью 1 условной переменной, но вам будет немного легче подумать, если вы используете 2.

Вот что я придумала. Протестировано на Visual Studio 2017 (15.6.7) и GCC 5.4.0. Мне не нужно зачислять или что-то еще (это такая простая штука), но по закону я должен сказать, что я не даю никаких гарантий.

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <chrono>


std::vector<int> g_deliveryBuffer;
bool g_quit = false;
std::mutex g_mutex;  // protects g_deliveryBuffer and g_quit
std::condition_variable g_producerDeliver;
std::condition_variable g_consumerAccepted;


// consumer
void consume() 
{
    // local buffer
    std::vector<int> consumerBuffer;

    while (true)
    {
        if (consumerBuffer.empty())
        {  
            std::unique_lock<std::mutex> lock(g_mutex);
            while (g_deliveryBuffer.empty() && !g_quit)  // if we beat the producer, wait for them to push to the deliverybuffer
                g_producerDeliver.wait(lock);
            if (g_quit)
                break;
            consumerBuffer = std::move(g_deliveryBuffer);  // get the buffer
        }
        g_consumerAccepted.notify_one();  // notify the producer that the buffer has been accepted

        // for-loop to process the elems in Q
        // ...
        consumerBuffer.clear();
        // ...
    }
}


// producer
void produce() 
{
    std::vector<int> producerBuffer;
    while (true) 
    {
        // for-loop to fill up Q_buf
        // ...
        producerBuffer = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        // ...

        // once Q_buf is fully filled, wait until consumer asks to give it a full Q
        {   // scope is for lock
            std::unique_lock<std::mutex> lock(g_mutex);
            g_deliveryBuffer = std::move(producerBuffer);  // ok to push to deliverybuffer. it is guaranteed to be empty
            g_producerDeliver.notify_one();
            while (!g_deliveryBuffer.empty() && !g_quit)
                g_consumerAccepted.wait(lock);  // wait for consumer to signal for more data
            if (g_quit)
                break;
            // We will never reach this point if the buffer is not empty.
        }
    }
}



int main()
{
    // spawn threads
    std::thread consumerThread(consume);
    std::thread producerThread(produce);

    // for for 5 seconds
    std::this_thread::sleep_for(std::chrono::seconds(5));

    // signal that it's time to quit
    {
        std::lock_guard<std::mutex> lock(g_mutex);
        g_quit = true;
    }
    // one of the threads may be sleeping
    g_consumerAccepted.notify_one();
    g_producerDeliver.notify_one();

    consumerThread.join();
    producerThread.join();

    return 0;
}
0 голосов
/ 17 мая 2018

Может быть, это близко к тому, чего вы хотите достичь. Я использовал 2 условные переменные, чтобы уведомлять производителей и потребителей друг о друге, и ввел переменную, обозначающую, какой поворот сейчас:

#include <ctime>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

template<typename T>
class ReaderWriter {
    private:
        std::vector<std::thread> readers;
        std::vector<std::thread> writers;
        std::condition_variable readerCv, writerCv;
        std::queue<T> data;
        std::mutex readerMutex, writerMutex;
        size_t noReaders, noWriters;
        enum class Turn { WRITER_TURN, READER_TURN };
        Turn turn;
        void reader() {
            while (1) {
                {
                    std::unique_lock<std::mutex> lk(readerMutex);    
                    while (turn != Turn::READER_TURN) {
                        readerCv.wait(lk);
                    }
                    std::cout << "Thread : " << std::this_thread::get_id() << " consumed " << data.front() << std::endl;
                    data.pop();
                    if (data.empty()) {
                        turn = Turn::WRITER_TURN;
                        writerCv.notify_one();
                    }
                }
            }
        }

        void writer() {
            while (1) {
                {
                    std::unique_lock<std::mutex> lk(writerMutex);
                    while (turn != Turn::WRITER_TURN) {
                        writerCv.wait(lk);
                    }
                    srand(time(NULL));
                    int random_number = std::rand();
                    data.push(random_number);
                    std::cout << "Thread : " << std::this_thread::get_id() << " produced " << random_number << std::endl;
                    turn = Turn::READER_TURN;
                }
                readerCv.notify_one();
            }
        }

    public:
        ReaderWriter(size_t noReadersArg, size_t noWritersArg) : noReaders(noReadersArg), noWriters(noWritersArg), turn(ReaderWriter::Turn::WRITER_TURN) {
        }

        void run() {
            int noReadersArg = noReaders, noWritersArg = noWriters;
            while (noReadersArg--) {
                readers.emplace_back(&ReaderWriter::reader, this);
            }

            while (noWritersArg--) {
                writers.emplace_back(&ReaderWriter::writer, this);
            }
        }

        ~ReaderWriter() {
            for (auto& r : readers) {
                r.join();
            }
            for (auto& w : writers) {
                w.join();
            }
        }
};

int main() {
    ReaderWriter<int> rw(5, 5);
    rw.run();
}
0 голосов
/ 13 мая 2018

Вот фрагмент кода. Поскольку рабочие шаги уже синхронизированы, требование двух буферов исключается. Поэтому для моделирования сценария используется простая очередь:

#include "conio.h"
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <atomic>
#include <condition_variable>

using namespace std;

enum state_t{ READ = 0, WRITE = 1 };

mutex mu;
condition_variable cv;
atomic<bool> running;
queue<int> buffer;
atomic<state_t> state;

void generate_test_data()
{
    const int times = 5;
    static int data = 0;

    for (int i = 0; i < times; i++) {
        data = (data++) % 100;
        buffer.push(data);
    }
}

void ProducerThread() {

    while (running) {
        unique_lock<mutex> lock(mu);
        cv.wait(lock, []() { return !running || state == WRITE; });
        if (!running) return;
        generate_test_data(); //producing here
        lock.unlock();

        //notify consumer to start consuming
        state = READ;
        cv.notify_one();
    }
}

void ConsumerThread() {

    while (running) {

        unique_lock<mutex> lock(mu);
        cv.wait(lock, []() { return !running || state == READ; });
        if (!running) return;
        while (!buffer.empty()) {
            auto data = buffer.front();  //consuming here
            buffer.pop();                  
            cout << data << " \n";
        }

        //notify producer to start producing
        if (buffer.empty()) {
            state = WRITE;
            cv.notify_one();
        }
    }
}

int main(){
    running = true;
    thread producer = thread([]() { ProducerThread(); });
    thread consumer = thread([]() { ConsumerThread(); });

    //simulating gui thread
    while (!getch()){
    }

    running = false;
    producer.join();
    consumer.join();
}
0 голосов
/ 15 мая 2018

Не полный ответ, хотя я думаю, что могут быть полезны две переменные условия: одна с именем buffer_empty, которую будет ожидать поток производителя, и другая с именем buffer_filled, которая будет ждать поток потребителя. Количество мьютексов, как зацикливаться, и так далее, я не могу комментировать, так как сам не уверен в деталях.

0 голосов
/ 07 мая 2018

Код неправильно предполагает, что vector<int>::size() и vector<int>::swap() являются атомарными. Это не так.

Кроме того, ложные пробуждения должны обрабатываться циклом while (или другой перегрузкой cv::wait).

Исправления:

mutex m;
condition_variable cv;
vector<int> Q;

// consumer
void consume() {
    while(1) {
        // Get the new elements.
        vector<int> new_elements;
        {
            unique_lock<mutex> lk(m);
            while(Q.empty())
                cv.wait(lk);
            new_elements.swap(Q);
        }
        // for-loop to process the elems in new_elements
    }
}

// producer
void produce() {
    while(1) {
        vector<int> new_elements;
        // for-loop to fill up new_elements

        // publish new_elements
        {
            unique_lock<mutex> lk(m);
            Q.insert(Q.end(), new_elements.begin(), new_elements.end());
            cv.notify_one();
        }
    }
}
...