c ++ synchronized_queue / потоковое приложение аварийно завершает работу при запуске без указания причины c - PullRequest
0 голосов
/ 24 марта 2020

Я использую шаблон проектирования Producer / 2 Consumers и сталкиваюсь со странной проблемой сбоя при запуске. Я пытался с GDB, но он говорит мне о "Неизвестном источнике" / Cygwin.S отсутствует. В любом случае, где бы я ни ставил точки останова, если я «продолжаю» код, он неизбежно завершается с SIGSEV в потоке 5 (не знаю, что такое поток 5, вероятно, t_b ниже).

Вот его частичный код

//similar for B
struct dataA {
    int a;
    float b;
    int c;
    //...
}

//similar for B
union typeA {
   char byte[SIZE_A];
   struct dataA d;
}

union typeA a;
union typeB b; //2megabyte of data

SynchronizedQueue<typeA> a_queue;
SynchronizedQueue<typeB> b_queue;

void a_Thread()
{
    for (;;){
        typeA a = a_queue.get();
        cout << "a: " << a.d.c << endl; //Pseudo code
    }

}

void b_Thread()
{
    for (;;){
        typeB b = b_queue.get();
        cout << "b: " << b.d.a << endl; //Pseudo code
    }
}

int main(int argc, char **argv)
{

    thread t_a(a_Thread);
    thread t_b(b_Thread); 

    //open a file ecc

    int c;
    while ((c = fgetc(fp)) != EOF){
        //logical code

        if (case_A){
            typeA data = xyz;
            //or int data = 7;
            a_queue.put(data);
        } else if (case_B){
            typeB data = qwe;
            //or int data = 7;
            b_queue.put(data);
        }
    }

    //close file
    return 0;
}

Если я закомментирую

thread t_b(b_Thread); 

что-нибудь работает, как и ожидалось, но если я делаю то же самое для t_a, проблема все еще существует, следовательно, я подозреваю, что это что-то делать с т_б. Имейте в виду, что код вылетает даже до достижения операторов "put".

В то время как для t_a он работает с желаемым типом данных (конечная цель - использовать union / struct), для t_b он работает, когда используя, например, int, но это не так, когда я использую свои собственные данные (2MB объединения / структуры) Более того, поскольку два объединения / структуры являются глобальными, я не уверен, что я потокобезопасен, насколько я понимаю, я передаю указатель на очередь, поэтому потенциально у меня могут быть условия гонки, если потребитель займет много времени. В этом случае я подозреваю, что решением было бы создать memcpy локального объединения / структуры и использовать его.

РЕДАКТИРОВАТЬ: реализация SynchronizedQueue https://home.deib.polimi.it/fornacia/lib/exe/fetch.php?media=teaching: aos: 2017: aos_l4_multithreading_ cpp .pdf p42

РЕДАКТИРОВАТЬ 2: Вот фрагмент кода

main. cpp

#include "synchronized_queue.h"
#include <iostream>
#include <thread>
#include <cstring>

using namespace std;
using namespace std::chrono;

struct dataA {
    int a;
    float b;
    int c;
};

union typeA {
   char byte[2300000];
   struct dataA d;
};

struct dataB {
    uint8_t a;
    float b;
    double c;
};

union typeB {
   char byte[20500];
   struct dataB d;
};

union typeA a;
union typeB b; //2megabyte of data

SynchronizedQueue<typeA> a_queue;
SynchronizedQueue<typeB> b_queue;

void a_Thread()
{
    for(;;){
        typeA _a = a_queue.get();
        cout << "a: " << _a.d.c << endl;
    } 
}

void b_Thread()
{
    for(;;){
        typeB _b = b_queue.get();
        cout << "b: " << _b.d.c << endl;
    }
}

int main(int argc, char **argv)
{  
    thread t_a(a_Thread);
    thread t_b(b_Thread);

    srand(time(NULL));

    for(int i=0;i<100;i++){
        if(rand() % 100 + 1 > 50){
            typeA a;
            a.d.a = 10;
            a.d.b = 10.2;
            a.d.c = 10;
            a_queue.put(a);
        } else {
            typeB b;
            b.d.a = 70;
            b.d.b = 70.5;
            b.d.c = 70.3658485226;
            b_queue.put(b);
        }
    }

    //a_queue.finish();
    //b_queue.finish();

    //t_a.join();
    //t_b.join();

    return 0;
}

synchronized_queue.h

#ifndef SYNC_QUEUE_H_
#define SYNC_QUEUE_H_
#include <list>
#include <mutex>
#include <condition_variable>
#include <atomic>

template <typename T>
class SynchronizedQueue
{
public:
    SynchronizedQueue(){};
    void put(const T &data);
    T get();
    size_t size();

private:
    SynchronizedQueue(const SynchronizedQueue &) = delete;
    SynchronizedQueue &operator=(const SynchronizedQueue &) = delete;
    std::list<T> queue;
    std::mutex myMutex;
    std::condition_variable myCv;
};

template <typename T>
void SynchronizedQueue<T>::put(const T &data)
{
    std::unique_lock<std::mutex> lck(myMutex);
    queue.push_back(data);
    myCv.notify_one();
}

template <typename T>
T SynchronizedQueue<T>::get()
{
    std::unique_lock<std::mutex> lck(myMutex);
    while (queue.empty())
        myCv.wait(lck);
    T result = queue.front();
    queue.pop_front();
    return result;
}

template <typename T>
size_t SynchronizedQueue<T>::size()
{
    std::unique_lock<std::mutex> lck(myMutex);
    return queue.size();
}

#endif // SYNC_QUEUE_H_

если изменить 2300000 на 23000, код работает!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...