Я использую шаблон проектирования Producer / 2 Consumers и сталкиваюсь со странной проблемой сбоя при запуске. Я пытался с GDB, но он говорит мне о "Неизвестном источнике" / Cygwin.S отсутствует. В любом случае, где бы я ни ставил точки останова, если я «продолжаю» код, он неизбежно завершается с SIGSEV в потоке 5 (не знаю, что такое поток 5, вероятно, t_b ниже).
Вот его частичный код
//similar for B
struct dataA {
int a;
float b;
int c;
//...
}
//similar for B
union typeA {
char byte[SIZE_A];
struct dataA d;
}
union typeA a;
union typeB b; //2megabyte of data
SynchronizedQueue<typeA> a_queue;
SynchronizedQueue<typeB> b_queue;
void a_Thread()
{
for (;;){
typeA a = a_queue.get();
cout << "a: " << a.d.c << endl; //Pseudo code
}
}
void b_Thread()
{
for (;;){
typeB b = b_queue.get();
cout << "b: " << b.d.a << endl; //Pseudo code
}
}
int main(int argc, char **argv)
{
thread t_a(a_Thread);
thread t_b(b_Thread);
//open a file ecc
int c;
while ((c = fgetc(fp)) != EOF){
//logical code
if (case_A){
typeA data = xyz;
//or int data = 7;
a_queue.put(data);
} else if (case_B){
typeB data = qwe;
//or int data = 7;
b_queue.put(data);
}
}
//close file
return 0;
}
Если я закомментирую
thread t_b(b_Thread);
что-нибудь работает, как и ожидалось, но если я делаю то же самое для t_a, проблема все еще существует, следовательно, я подозреваю, что это что-то делать с т_б. Имейте в виду, что код вылетает даже до достижения операторов "put".
В то время как для t_a он работает с желаемым типом данных (конечная цель - использовать union / struct), для t_b он работает, когда используя, например, int, но это не так, когда я использую свои собственные данные (2MB объединения / структуры) Более того, поскольку два объединения / структуры являются глобальными, я не уверен, что я потокобезопасен, насколько я понимаю, я передаю указатель на очередь, поэтому потенциально у меня могут быть условия гонки, если потребитель займет много времени. В этом случае я подозреваю, что решением было бы создать memcpy локального объединения / структуры и использовать его.
РЕДАКТИРОВАТЬ: реализация SynchronizedQueue https://home.deib.polimi.it/fornacia/lib/exe/fetch.php?media=teaching: aos: 2017: aos_l4_multithreading_ cpp .pdf p42
РЕДАКТИРОВАТЬ 2: Вот фрагмент кода
main. cpp
#include "synchronized_queue.h"
#include <iostream>
#include <thread>
#include <cstring>
using namespace std;
using namespace std::chrono;
struct dataA {
int a;
float b;
int c;
};
union typeA {
char byte[2300000];
struct dataA d;
};
struct dataB {
uint8_t a;
float b;
double c;
};
union typeB {
char byte[20500];
struct dataB d;
};
union typeA a;
union typeB b; //2megabyte of data
SynchronizedQueue<typeA> a_queue;
SynchronizedQueue<typeB> b_queue;
void a_Thread()
{
for(;;){
typeA _a = a_queue.get();
cout << "a: " << _a.d.c << endl;
}
}
void b_Thread()
{
for(;;){
typeB _b = b_queue.get();
cout << "b: " << _b.d.c << endl;
}
}
int main(int argc, char **argv)
{
thread t_a(a_Thread);
thread t_b(b_Thread);
srand(time(NULL));
for(int i=0;i<100;i++){
if(rand() % 100 + 1 > 50){
typeA a;
a.d.a = 10;
a.d.b = 10.2;
a.d.c = 10;
a_queue.put(a);
} else {
typeB b;
b.d.a = 70;
b.d.b = 70.5;
b.d.c = 70.3658485226;
b_queue.put(b);
}
}
//a_queue.finish();
//b_queue.finish();
//t_a.join();
//t_b.join();
return 0;
}
synchronized_queue.h
#ifndef SYNC_QUEUE_H_
#define SYNC_QUEUE_H_
#include <list>
#include <mutex>
#include <condition_variable>
#include <atomic>
template <typename T>
class SynchronizedQueue
{
public:
SynchronizedQueue(){};
void put(const T &data);
T get();
size_t size();
private:
SynchronizedQueue(const SynchronizedQueue &) = delete;
SynchronizedQueue &operator=(const SynchronizedQueue &) = delete;
std::list<T> queue;
std::mutex myMutex;
std::condition_variable myCv;
};
template <typename T>
void SynchronizedQueue<T>::put(const T &data)
{
std::unique_lock<std::mutex> lck(myMutex);
queue.push_back(data);
myCv.notify_one();
}
template <typename T>
T SynchronizedQueue<T>::get()
{
std::unique_lock<std::mutex> lck(myMutex);
while (queue.empty())
myCv.wait(lck);
T result = queue.front();
queue.pop_front();
return result;
}
template <typename T>
size_t SynchronizedQueue<T>::size()
{
std::unique_lock<std::mutex> lck(myMutex);
return queue.size();
}
#endif // SYNC_QUEUE_H_
если изменить 2300000 на 23000, код работает!