Заполнение и сохранение общего буфера между потоками - PullRequest
0 голосов
/ 18 июня 2019

Я работаю с API, который извлекает данные I / Q.Вызов функции bbGetIQ(m_handle, &pkt); заполняет буфер.Это зацикливание потока, пока пользователь не ввел «стоп».Pkt - это структура, а используемый буфер - pkt.iqData = &m_buffer[0];, который является вектором с плавающей точкой.Размер вектора равен 5000, и каждый раз, когда мы зацикливаемся, буфер заполняется 5000 значениями.

Я хочу сохранить данные из буфера в файл, и я делал это сразу после вызовадо bbgetIQ, но выполнение этого является трудоемкой задачей, данные не были получены достаточно быстро, что привело к удалению данных API, поэтому он мог продолжать заполнять свой буфер.

Вот как выглядел мой код:


void Acquisition::recordIQ(){

    int cpt = 0;
    ofstream myfile;


    while(1){

        while (keep_running)
        {   

            cpt++;

            if(cpt < 2)
                myfile.open ("/media/ssd/IQ_Data.txt");


            bbGetIQ(m_handle, &pkt); //Retrieve I/Q data


            //Writing content of buffer into the file.
            for(int i=0; i<m_buffer.size(); i++)
                myfile << m_buffer[i] << endl;


        }
        cpt = 0;
        myfile.close();
    }
}

Затем я попытался записать в файл только после выхода из цикла:



void Acquisition::recordIQ(){

    int cpt = 0;
    ofstream myfile;
    int next=0;
    vector<float> data;


    while(1){

        while ( keep_running)
        {   
            if(keep_running == false){

                myfile.open ("/media/ssd/IQ_Data.txt");

                for(int i=0; i<data.size(); i++)
                    myfile << data[i] << endl;

                myfile.close();
                break;
            }

            cpt++;

            data.resize(next + m_buffer.size());

            bbGetIQ(m_handle, &pkt); //retrieve data

            std::copy(m_buffer.begin(), m_buffer.end(), data.begin() + next); //copy content of the buffer into final vector

            next += m_buffer.size(); //next index

        }

        cpt = 0;

    }
}

Я больше не получаю потерю данных из API, но проблема в том, что яограничено размером data вектора.Например, я не могу позволить ему извлекать данные всю ночь.

Моя идея - создать 2 темы.Один будет извлекать данные, а другой запишет данные в файл.2 потока будут использовать кольцевой буфер, где первый поток будет заполнять буфер, а второй поток будет читать буфер и записывать содержимое в файл.Поскольку это общий буфер, я думаю, я должен использовать мьютексы.

Я новичок в многопоточности и мьютексах, так что это будет хорошей идеей?Я действительно не знаю, с чего начать и как поток потребителя может прочитать буфер, в то время как производитель заполнит его.Будет ли блокировка буфера во время чтения причиной сброса данных API?(потому что он не сможет записать его в кольцевой буфер).

EDIT : поскольку я хочу, чтобы поток моей записи работал в фоновом режиме, чтобы я мог делать другие вещи во время записиЯ отключил его, и пользователь может запустить запись, установив для условия keep_running значение true.


thread t1(&Acquisition::recordIQ, &acq);
t1.detach();

1 Ответ

0 голосов
/ 18 июня 2019

Вам нужно использовать что-то вроде этого (https://en.cppreference.com/w/cpp/thread/condition_variable):

Глобал:

std::mutex m;
std::condition_variable cv;
std::vector<std::vector<float>> datas;
bool keep_running = true, start_running = false;

написание темы:

void writing_thread()
{
    myfile.open ("/media/ssd/IQ_Data.txt");

    while(1) {
        // Wait until main() sends data
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return keep_running && !datas.empty();});
        if (!keep_running) break;

        auto d = std::move(datas); 
        lk.unlock();

        for(auto &entry : d) {
            for(auto &e : entry)
                myfile << e << endl;             
        }
    }
}

отправка потока:

void sending_thread() {
    while(1) {
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, []{return keep_running && start_running;});
            if (!keep_running) break;
        }

        bbGetIQ(m_handle, &pkt); //retrieve data

        std::vector<float> d = m_buffer;

        {
            std::lock_guard<std::mutex> lk(m);
            if (!keep_running) break;
            datas.push_back(std::move(d));
        }
        cv.notify_one();
    }
}
void start() {
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = true;
    }
    cv.notify_all();
}
void stop() {
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = false;
    }
    cv.notify_all();
}
void terminate() {
    {
        std::unique_lock<std::mutex> lk(m);
        keep_running = false;
    }
    cv.notify_all();

    thread1.join();
    thread2.join();
}

Вкратце: Отправляющий поток получает данные от чего бы то ни было, блокирует мьютекс mt и перемещает данные в datas хранилище. Затем он использует условную переменную cv для уведомления ожидающих потоков о том, что нужно что-то делать. Поток записи ожидает сигнала условной переменной, затем блокирует мьютекс mt, перемещает данные из глобальной переменной datas в локальную, затем освобождает мьютекс и приступает к записи только что полученных данных в файл. Ключ должен держать мьютекс заблокированным как можно меньше времени.

EDIT: чтобы завершить все это, вам нужно установить keep_running в false. Затем Позвоните cv.notify_all(). Затем присоединиться к темам. Порядок важен. Вам необходимо объединить потоки, потому что запись потока может все еще происходить в процессе записи данных.

EDIT2: добавлен отложенный запуск. Теперь создайте два потока, в одном прогоне sending_thread, в другом writing_thread. Звоните start(), чтобы включить обработку, и stop(), чтобы остановить ее.

...