Запись в файл из общего буфера отсутствующих данных и сбой программы без cout - PullRequest
0 голосов
/ 21 июня 2019

Я делаю программу, используя потоки и общий буфер. Два потока работают бесконечно в фоновом режиме, один поток заполнит общий буфер данными, а другой поток запишет содержимое общего буфера в файл.

Пользователь может запустить или остановить заполнение данных, в результате чего поток переходит в состояние ожидания, пока пользователь не запустит поток снова. В каждом цикле буфер заполнен на 50 поплавков.

Это код:


#include <iostream>
#include <vector>
#include <iterator>
#include <utility>
#include <fstream>
#include <condition_variable>
#include <mutex>
#include <thread>

using namespace std;

std::mutex m;
std::condition_variable cv;
std::vector<std::vector<float>> datas;
bool keep_running = true, start_running = false;

void writing_thread()
{
    ofstream myfile;

    bool opn = false;

    while(1)
    {

        while(keep_running)
        {
            // Open the file only once
            if(!opn)
            {
                myfile.open("IQ_Datas.txt");
                opn = true;

            }


            // Wait until main() sends data
            std::unique_lock<std::mutex> lk(m);

            cv.wait(lk, [] {return !datas.empty();});


            auto d = std::move(datas);


            lk.unlock();


            for(auto &entry : d)
            {
                for(auto &e : entry)
                    myfile << e << endl;
            }


        }

        if(opn)
        {
            myfile.close();
            opn = false;
        }

    }
}

void sending_thread()
{

    std::vector<float> m_buffer;
    int cpt=0;
    //Fill the buffer with 50 floats
    for(float i=0; i<50; i++)
        m_buffer.push_back(i);

    while(1)
    {
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] {return keep_running && start_running;});

        }
        while(keep_running)
        {

            //Each loop d is containing 50 floats
            std::vector<float> d = m_buffer;

            cout << "in3" << endl; //Commenting this line makes the program crash

            {
                std::lock_guard<std::mutex> lk(m);
                if (!keep_running)break;
                datas.push_back(std::move(d));
            }
            cv.notify_one();
            cpt++;
        }

        cout << "Total data: " << cpt*50 << endl;
        cpt = 0;
    }
}
void start()
{
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = true;
    }
    cv.notify_all();
}
void stop()
{
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = false;
    }
    cv.notify_all();
}

int main()
{
    int go = 0;
    thread t1(sending_thread);
    thread t2(writing_thread);

    t1.detach();
    t2.detach();

    while(1)
    {

        std::cin >> go;

        if(go == 1)
        {
            start();
            keep_running = true;
        }
        else if(go == 0)
        {
            stop();
            keep_running = false;
        }


    }

    return 0;
}


У меня 2 проблемы с этим кодом:

  • При комментировании строки cout << "in3" << endl; программа вылетает через ~ 20-40 секунд с сообщением об ошибке: прекращение вызова после выброса экземпляра 'std :: bad_alloc' что (): std :: bad_alloc . Если я позволю cout, программа будет работать без проблем.

  • Когда программа работает, после остановки sending_thread я отображаю общий объем данных, которые были скопированы с cout << "Total data: " << cpt*50 << endl;. Для небольшого количества данных все это правильно записывается в файл, но когда оно велико, отсутствуют данные. Отсутствующие / исправленные данные (Общее количество строк в файле не соответствует total data)

Почему с cout программа работает правильно? И что является причиной отсутствия данных? Это потому, что sending_thread заполняет буфер слишком быстро, а writing_thread занимает слишком много времени для записи в файл?

РЕДАКТИРОВАТЬ: Некоторые точности, добавив больше cout в sending_thread, кажется, решить все проблемы. Первый поток произвел 21 миллион операций с плавающей запятой, а второй поток успешно записал в файл 21 миллион операций с плавающей запятой. Похоже, что без cout потоки производителя работают слишком быстро, чтобы потребительский поток продолжал извлекать данные из общего буфера при записи их в файл.

1 Ответ

2 голосов
/ 21 июня 2019

Во избежание:

Moved-from object 'datas' of type 'std::vector' is moved:
        auto d = std::move(datas);
                 ^~~~~~~~~~~~~~~~

Заменить это:

        // Wait until main() sends data
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, [] {return !datas.empty();});
        auto d = std::move(datas);
        lk.unlock();

С этим:

        // Wait until main() sends data            
        std::vector<std::vector<float>> d;
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] { return !datas.empty(); });
            datas.swap(d);
        }

Также замените переменные bool, к которым обращаются из нескольких потоков, на std::atomic_bool или std::atomic_flag.

bad_alloc происходит от sending_thread намного быстрее, чем writing_thread, поэтому ему не хватит памяти. При достаточном замедлении sending_thread (с печатью) проблема становится менее заметной, но для правильной работы у вас должна быть некоторая синхронизация. Вы можете создать класс-оболочку вокруг него и предоставить методы вставки и извлечения, чтобы убедиться, что весь доступ синхронизирован должным образом, а также дать ему максимальное количество элементов. Пример:

template<typename T>
class atomic2dvector {
public:
    atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}

    atomic2dvector(const atomic2dvector&) = delete;
    atomic2dvector(atomic2dvector&&) = delete;
    atomic2dvector& operator=(const atomic2dvector&) = delete;
    atomic2dvector& operator=(atomic2dvector&&) = delete;

    ~atomic2dvector() { shutdown(); }

    bool insert_one(std::vector<T>&& other) {
        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_current_elements + m_data.size() > m_max_elements && m_shutdown == false)
            m_cv.wait(lock);
        if(m_shutdown) return false;

        m_current_elements += other.size();
        m_data.emplace_back(std::forward<std::vector<T>>(other));

        m_cv.notify_one();
        return true;
    }
    std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        if(m_shutdown == false) {
            m_current_elements = 0;
            return_value.swap(m_data);
        } else {
            // return an empty vector if we should shutdown
        }
        m_cv.notify_one();

        return return_value;
    }

    bool is_active() const { return m_shutdown == false; }

    void shutdown() {
        m_shutdown = true;
        m_cv.notify_all();
    }

private:
    size_t m_max_elements;
    size_t m_current_elements = 0;
    std::atomic<bool> m_shutdown = false;
    std::condition_variable m_cv{};
    std::mutex m_mtx{};
    std::vector<std::vector<T>> m_data{};
};

Если вы хотите продолжать извлекать данные даже после выключения, вы можете изменить extract_all() на следующее:

   std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        m_current_elements = 0;
        return_value.swap(m_data);
        m_cv.notify_one();

        return return_value;
    }

Полный пример может выглядеть так:

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <iostream>
#include <iterator>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

using namespace std;

template<typename T>
class atomic2dvector {
public:
    atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}
    atomic2dvector(const atomic2dvector&) = delete;
    atomic2dvector(atomic2dvector&&) = delete;
    atomic2dvector& operator=(const atomic2dvector&) = delete;
    atomic2dvector& operator=(atomic2dvector&&) = delete;

    ~atomic2dvector() { shutdown(); }

    bool insert_one(std::vector<T>&& other) {
        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_current_elements + m_data.size() > m_max_elements &&
              m_shutdown == false)
            m_cv.wait(lock);
        if(m_shutdown) return false;

        m_current_elements += other.size();
        m_data.emplace_back(std::forward<std::vector<T>>(other));

        m_cv.notify_one();
        return true;
    }
    std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        m_current_elements = 0;
        return_value.swap(m_data);
        m_cv.notify_one();

        return return_value;
    }

    bool is_active() const { return m_shutdown == false; }

    void shutdown() {
        m_shutdown = true;
        m_cv.notify_all();
    }

private:
    size_t m_max_elements;
    size_t m_current_elements = 0;
    std::atomic<bool> m_shutdown = false;
    std::condition_variable m_cv{};
    std::mutex m_mtx{};
    std::vector<std::vector<T>> m_data{};
};

std::mutex m;
std::condition_variable cv;
atomic2dvector<float> datas(256 * 1024 * 1024 / sizeof(float)); // 0.25 GiB limit
std::atomic_bool start_running = false;

void writing_thread() {
    std::ofstream myfile("IQ_Datas.txt");
    if(myfile) {
        std::cout << "writing_thread waiting\n";

        std::vector<std::vector<float>> d;
        while((d = datas.extract_all()).empty() == false) {
            std::cout << "got " << d.size() << "\n";

            for(auto& entry : d) {
                for(auto& e : entry) myfile << e << "\n";
            }
            std::cout << "wrote " << d.size() << "\n\n";
        }
    }
    std::cout << "writing_thread shutting down\n";
}

void sending_thread() {
    std::vector<float> m_buffer;
    std::uintmax_t cpt = 0;
    // Fill the buffer with 50 floats
    for(float i = 0; i < 50; i++) m_buffer.push_back(i);

    while(true) {
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] {
                return start_running == true || datas.is_active() == false;
            });
        }
        if(datas.is_active() == false) break;
        std::cout << "sending...\n";
        while(start_running == true) {
            // Each loop d is containing 50 floats
            std::vector<float> d = m_buffer;
            if(datas.insert_one(std::move(d)) == false) break;
            cpt++;
        }
        cout << "Total data: " << cpt * 50 << endl;
        cpt = 0;
    }
    std::cout << "sending_thread shutting down\n";
}

void start() {
    std::unique_lock<std::mutex> lk(m);
    start_running = true;
    cv.notify_all();
}
void stop() {
    std::unique_lock<std::mutex> lk(m);
    start_running = false;
    cv.notify_all();
}
void quit() {
    datas.shutdown();
    cv.notify_all();
}

int main() {
    int go = 0;
    thread t1(sending_thread);
    thread t2(writing_thread);

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::cout << "Enter 1 to make the sending thread send and 0 to make it stop "
                 "sending. Enter a non-integer to shutdown.\n";

    while(std::cin >> go) {
        if(go == 1) {
            start();
        } else if(go == 0) {
            stop();
        }
    }
    std::cout << "--- shutting down ---\n";
    quit();

    std::cout << "joining threads\n";
    t1.join();
    std::cout << "t1 joined\n";
    t2.join();
    std::cout << "t2 joined\n";
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...