C ++ Несколько пользовательских потоков застряли в условной переменной - PullRequest
0 голосов
/ 30 сентября 2019

Я делаю программу для одного производителя с несколькими потребителями на C ++. Я начинаю с вызова потребительских потоков, а затем добавляю элементы в массив. Все работает нормально, но в конце потребительские потоки не присоединяются, потому что они застряли в ожидании переменной условия и программа зависает.

Я думаю, что проблема в том, что потоки постоянно вызываются в цикле, потому что currentSizeне защищены, и они просто не могут выйти из условной переменной, но я не знаю, как это исправить.

struct Item {
public:
    string name;
    int time;
    double height;
};

struct Monitor {
private:
    Item items[12];
    int currentSize;
    bool finished;
    mutex lock;
    condition_variable cv;
public:
    Monitor() {
        finished = false;
        currentSize = 0;
    }
    void put(Item item) {
        unique_lock<mutex> guard(lock);
        cv.wait(guard, [&] { return (currentSize < 12); });
        items[currentSize] = item;
        currentSize++;
        cv.notify_all();
    }

    Item get() {
        unique_lock<mutex> guard(lock);
        cv.wait(guard, [&] { return (currentSize > 0); });
        Item item = items[currentSize - 1];
        currentSize--;
        return item;
    }
    bool get_finished() {
        return finished;
    }
    void set_finished() {
        finished = true;
    }
    int get_size() {
        return currentSize;
    }
};

int main() {
    vector<Item> items = read_file(file);

    Monitor monitor;
    vector<thread> threads;
    vector<Item> results;

    for (int i = 0; i < 4; i++) {
        threads.emplace_back([&] {
            while (!monitor.get_finished()) {
                if (monitor.get_size() > 0) {
                    Item item = monitor.get();
                    results.push_back(item);
                }
            }
        });
    }

    for (int i = 0; i < items.size(); i++) {
        monitor.put(items[i]);
    }
    monitor.set_finished();

    for_each(threads.begin(), threads.end(), mem_fn(&thread::join));

    return 0;
}

Ответы [ 2 ]

1 голос
/ 30 сентября 2019

Почему блокируются потоки потребителя?

Я проверил ваш код, и оказалось, что блокировка потока производителя выполняется методом put(). Почему?

Представьте себе следующий сценарий: в векторе 13 элементов items.

Основной поток (производитель) успешно загружает первые 12 элементов и ожидает cvcurrentSize становится ниже 12.

Пользовательские потоки уведомляются и с радостью потребляют первые 12 элементов, а затем ждут на cv, пока currentSize не станет больше 0.

Но подождите! Теперь все чего-то ждут, никто не уведомляет. Таким образом, все потоки будут блокироваться. Вам необходимо уведомить производителя, когда currentSize станет ниже, чем 12.

0 голосов
/ 30 сентября 2019

Я заметил несколько проблем. сделал переменные-члены атомарными, notify_all в get api. Однако была и логическая ошибка. Представьте, что у вас 4 запущенных потока и 5 элементов в очереди. На данный момент предположим, что каждый поток может вывести один из очереди, и теперь в очереди 4 потока и только один элемент. Один из потоков выводит последний, и теперь там 0 элементов, однако остальные три потока все еще ожидают переменную условия. Таким образом, решение заключается в том, что если последний элемент отсутствует, каждый должен быть уведомлен об этом, и если нет другого элемента, возвращающегося из API.

#include <iostream>
#include <vector>
#include <condition_variable>
#include <thread>
#include <algorithm>
#include <atomic>

using namespace std;

using Item = int;

struct Monitor {
private:
    Item items[12];
    std::atomic<int> currentSize;
    std::atomic<bool> finished;
    mutex lock;
    condition_variable cv;
public:
    Monitor() {
        finished = false;
        currentSize = 0;
    }
    void put(Item item) {
        unique_lock<mutex> guard(lock);
        cv.wait(guard, [&] { return (currentSize < 12); });
        items[currentSize] = item;
        currentSize++;
        cv.notify_all();

        std::cerr << "+ " << currentSize << std::endl ;
    }

    Item get() {
        unique_lock<mutex> guard(lock);
        cv.wait(guard, [&] { return (currentSize >= 0 ); });

        Item item;
        if (currentSize > 0 ){      
            currentSize--;
            item = items[currentSize];
            cv.notify_all();
            std::cerr << "- " << currentSize << std::endl ;
        }
        return item;
    }
    bool get_finished() {
        return finished;
    }
    void set_finished() {
        finished = true;
    }
    int get_size() {
        return currentSize;
    }
};

int main() {
    vector<Item> items(200);
    std::fill ( items.begin() , items.end(), 100);

    Monitor monitor;
    vector<thread> threads;
    vector<Item> results;

    for (int i = 0; i < 10; i++) {
        threads.emplace_back([&] {
            while ( !monitor.get_finished() ) {
                if (monitor.get_size() > 0) {
                    Item item = monitor.get();
                    results.push_back(item);
                }
            }
        });
    }

    for (int i = 0; i < items.size(); i++) {
        monitor.put(items[i]);
    }
    monitor.set_finished();

    for_each(threads.begin(), threads.end(), mem_fn(&thread::join));

    return 0;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...