Во избежание:
Moved-from object 'datas' of type 'std::vector' is moved:
auto d = std::move(datas);
^~~~~~~~~~~~~~~~
Заменить это:
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [] {return !datas.empty();});
auto d = std::move(datas);
lk.unlock();
С этим:
// Wait until main() sends data
std::vector<std::vector<float>> d;
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [] { return !datas.empty(); });
datas.swap(d);
}
Также замените переменные bool
, к которым обращаются из нескольких потоков, на std::atomic_bool
или std::atomic_flag
.
bad_alloc
происходит от sending_thread
намного быстрее, чем writing_thread
, поэтому ему не хватит памяти. При достаточном замедлении sending_thread
(с печатью) проблема становится менее заметной, но для правильной работы у вас должна быть некоторая синхронизация. Вы можете создать класс-оболочку вокруг него и предоставить методы вставки и извлечения, чтобы убедиться, что весь доступ синхронизирован должным образом, а также дать ему максимальное количество элементов. Пример:
template<typename T>
class atomic2dvector {
public:
atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}
atomic2dvector(const atomic2dvector&) = delete;
atomic2dvector(atomic2dvector&&) = delete;
atomic2dvector& operator=(const atomic2dvector&) = delete;
atomic2dvector& operator=(atomic2dvector&&) = delete;
~atomic2dvector() { shutdown(); }
bool insert_one(std::vector<T>&& other) {
std::unique_lock<std::mutex> lock(m_mtx);
while(m_current_elements + m_data.size() > m_max_elements && m_shutdown == false)
m_cv.wait(lock);
if(m_shutdown) return false;
m_current_elements += other.size();
m_data.emplace_back(std::forward<std::vector<T>>(other));
m_cv.notify_one();
return true;
}
std::vector<std::vector<T>> extract_all() {
std::vector<std::vector<T>> return_value;
std::unique_lock<std::mutex> lock(m_mtx);
while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);
if(m_shutdown == false) {
m_current_elements = 0;
return_value.swap(m_data);
} else {
// return an empty vector if we should shutdown
}
m_cv.notify_one();
return return_value;
}
bool is_active() const { return m_shutdown == false; }
void shutdown() {
m_shutdown = true;
m_cv.notify_all();
}
private:
size_t m_max_elements;
size_t m_current_elements = 0;
std::atomic<bool> m_shutdown = false;
std::condition_variable m_cv{};
std::mutex m_mtx{};
std::vector<std::vector<T>> m_data{};
};
Если вы хотите продолжать извлекать данные даже после выключения, вы можете изменить extract_all()
на следующее:
std::vector<std::vector<T>> extract_all() {
std::vector<std::vector<T>> return_value;
std::unique_lock<std::mutex> lock(m_mtx);
while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);
m_current_elements = 0;
return_value.swap(m_data);
m_cv.notify_one();
return return_value;
}
Полный пример может выглядеть так:
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <iostream>
#include <iterator>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
using namespace std;
template<typename T>
class atomic2dvector {
public:
atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}
atomic2dvector(const atomic2dvector&) = delete;
atomic2dvector(atomic2dvector&&) = delete;
atomic2dvector& operator=(const atomic2dvector&) = delete;
atomic2dvector& operator=(atomic2dvector&&) = delete;
~atomic2dvector() { shutdown(); }
bool insert_one(std::vector<T>&& other) {
std::unique_lock<std::mutex> lock(m_mtx);
while(m_current_elements + m_data.size() > m_max_elements &&
m_shutdown == false)
m_cv.wait(lock);
if(m_shutdown) return false;
m_current_elements += other.size();
m_data.emplace_back(std::forward<std::vector<T>>(other));
m_cv.notify_one();
return true;
}
std::vector<std::vector<T>> extract_all() {
std::vector<std::vector<T>> return_value;
std::unique_lock<std::mutex> lock(m_mtx);
while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);
m_current_elements = 0;
return_value.swap(m_data);
m_cv.notify_one();
return return_value;
}
bool is_active() const { return m_shutdown == false; }
void shutdown() {
m_shutdown = true;
m_cv.notify_all();
}
private:
size_t m_max_elements;
size_t m_current_elements = 0;
std::atomic<bool> m_shutdown = false;
std::condition_variable m_cv{};
std::mutex m_mtx{};
std::vector<std::vector<T>> m_data{};
};
std::mutex m;
std::condition_variable cv;
atomic2dvector<float> datas(256 * 1024 * 1024 / sizeof(float)); // 0.25 GiB limit
std::atomic_bool start_running = false;
void writing_thread() {
std::ofstream myfile("IQ_Datas.txt");
if(myfile) {
std::cout << "writing_thread waiting\n";
std::vector<std::vector<float>> d;
while((d = datas.extract_all()).empty() == false) {
std::cout << "got " << d.size() << "\n";
for(auto& entry : d) {
for(auto& e : entry) myfile << e << "\n";
}
std::cout << "wrote " << d.size() << "\n\n";
}
}
std::cout << "writing_thread shutting down\n";
}
void sending_thread() {
std::vector<float> m_buffer;
std::uintmax_t cpt = 0;
// Fill the buffer with 50 floats
for(float i = 0; i < 50; i++) m_buffer.push_back(i);
while(true) {
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [] {
return start_running == true || datas.is_active() == false;
});
}
if(datas.is_active() == false) break;
std::cout << "sending...\n";
while(start_running == true) {
// Each loop d is containing 50 floats
std::vector<float> d = m_buffer;
if(datas.insert_one(std::move(d)) == false) break;
cpt++;
}
cout << "Total data: " << cpt * 50 << endl;
cpt = 0;
}
std::cout << "sending_thread shutting down\n";
}
void start() {
std::unique_lock<std::mutex> lk(m);
start_running = true;
cv.notify_all();
}
void stop() {
std::unique_lock<std::mutex> lk(m);
start_running = false;
cv.notify_all();
}
void quit() {
datas.shutdown();
cv.notify_all();
}
int main() {
int go = 0;
thread t1(sending_thread);
thread t2(writing_thread);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Enter 1 to make the sending thread send and 0 to make it stop "
"sending. Enter a non-integer to shutdown.\n";
while(std::cin >> go) {
if(go == 1) {
start();
} else if(go == 0) {
stop();
}
}
std::cout << "--- shutting down ---\n";
quit();
std::cout << "joining threads\n";
t1.join();
std::cout << "t1 joined\n";
t2.join();
std::cout << "t2 joined\n";
}