Завершение работы многопоточного приложения путем установки обработчика сигнала - PullRequest
0 голосов
/ 09 октября 2018

В следующем коде я создаю игрушечный класс, у которого есть поток, который записывает в очередь, в то время как другой поток читает из этой очереди и печатает его в stdout.Теперь, чтобы аккуратно завершить работу системы, я установил обработчик для SIGINT.Я ожидаю, что обработчик сигнала установит переменную std::atomic<bool> stopFlag, которая приведет threadB к отправке ядовитой пилюли (стража) в очередь, с которой threadA остановится.

class TestClass
{
public:

    TestClass();
    ~TestClass();
    void shutDown();

    TestClass(const TestClass&) = delete;
    TestClass& operator=(const TestClass&) = delete;


private:
    void init();
    void postResults();
    std::string getResult();
    void processResults();

    std::atomic<bool> stopFlag;

    std::mutex outQueueMutex;
    std::condition_variable outQueueConditionVariable;
    std::queue<std::string> outQueue;

    std::unique_ptr<std::thread> threadA;
    std::unique_ptr<std::thread> threadB;
};

void TestClass::init()
{
    threadA = std::make_unique<std::thread>(&TestClass::processResults, std::ref(*this));
    threadB = std::make_unique<std::thread>(&TestClass::postResults, std::ref(*this));
}

TestClass::TestClass():
    stopFlag(false)
{
    init();
}

TestClass::~TestClass()
{
    threadB->join();
}

void TestClass::postResults()
{
    while(true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        std::string name = "ABCDEF";
        {
            std::unique_lock<std::mutex> lock(outQueueMutex);
            outQueue.push(name);
            outQueueConditionVariable.notify_one();
        }
        if(stopFlag)
        {
            /*For shutting down output thread*/
            auto poisonPill = std::string();
            {
                std::unique_lock<std::mutex> lock(outQueueMutex);
                outQueue.push(poisonPill);
                outQueueConditionVariable.notify_one();
            }
            threadA->join();
            break;
        }
    }
}

void TestClass::shutDown()
{
    stopFlag = true;
}

std::string TestClass::getResult()
{
    std::string result;
    {
        std::unique_lock<std::mutex> lock(outQueueMutex);
        while(outQueue.empty())
        {
            outQueueConditionVariable.wait(lock);
        }
        result= outQueue.front();
        outQueue.pop();
    }
    return result;
}

void TestClass::processResults()
{
    while(true)
    {
        const auto result = getResult();

        if(result.empty())
        {
            break;
        }

        std::cout << result << std::endl;

    }
}

static void sigIntHandler(std::shared_ptr<TestClass> t, int)
{
    t->shutDown();
}
static std::function<void(int)> handler;

int main()
{
    auto testClass = std::make_shared<TestClass>();
    handler = std::bind(sigIntHandler, testClass, std::placeholders::_1);
    std::signal(SIGINT, [](int n){ handler(n);});
    return 0;
}

Я скомпилировал это с помощью gcc 5.2, используя флаг -std = c ++ 14.При нажатии Ctrl-C на моем компьютере с CentOS 7 я получаю следующую ошибку:

terminate called after throwing an instance of 'std::system_error'
  what():  Invalid argument
Aborted (core dumped)

Пожалуйста, помогите мне понять, что происходит.

Ответы [ 2 ]

0 голосов
/ 09 октября 2018

В результате ваша main функция немедленно завершает работу, уничтожая глобальный handler объект, а затем testClass.Тогда основной поток блокируется в TestClass::~TestClass.Обработчик сигнала завершает доступ к уже уничтоженным объектам, что приводит к неопределенному поведению.

Основной причиной является неопределенное владение объектом из-за общих указателей - вы не знаете, что и когда заканчивается уничтожением ваших объектов.


Более общий подход заключается в использовании другого потока дляобрабатывать все сигналы и блокировать сигналы во всех других потоках.Этот поток обработки сигналов может затем вызывать любые функции при получении сигнала.

Вам также здесь вообще не нужны умные указатели и функциональные оболочки.

Пример:

class TestClass
{
public:
    TestClass();
    ~TestClass();
    void shutDown();

    TestClass(const TestClass&) = delete;
    TestClass& operator=(const TestClass&) = delete;

private:
    void postResults();
    std::string getResult();
    void processResults();


    std::mutex outQueueMutex;
    std::condition_variable outQueueConditionVariable;
    std::queue<std::string> outQueue;
    bool stop = false;

    std::thread threadA;
    std::thread threadB;
};

TestClass::TestClass()
    : threadA(std::thread(&TestClass::processResults, this))
    , threadB(std::thread(&TestClass::postResults, this))
{}

TestClass::~TestClass() {
    threadA.join();
    threadB.join();
}

void TestClass::postResults() {
    while(true) {
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        std::string name = "ABCDEF";
        {
            std::unique_lock<std::mutex> lock(outQueueMutex);
            if(stop)
                return;
            outQueue.push(name);
            outQueueConditionVariable.notify_one();
        }
    }
}

void TestClass::shutDown() {
    std::unique_lock<std::mutex> lock(outQueueMutex);
    stop = true;
    outQueueConditionVariable.notify_one();
}

std::string TestClass::getResult() {
    std::string result;
    {
        std::unique_lock<std::mutex> lock(outQueueMutex);
        while(!stop && outQueue.empty())
            outQueueConditionVariable.wait(lock);
        if(stop)
            return result;
        result= outQueue.front();
        outQueue.pop();
    }
    return result;
}

void TestClass::processResults()
{
    while(true) {
        const auto result = getResult();
        if(result.empty())
            break;
        std::cout << result << std::endl;
    }
}

int main() {
    // Block signals in all threads.
    sigset_t sigset;
    sigfillset(&sigset);
    ::pthread_sigmask(SIG_BLOCK, &sigset, nullptr);

    TestClass testClass;

    std::thread signal_thread([&testClass]() {
        // Unblock signals in this thread only.
        sigset_t sigset;
        sigfillset(&sigset);
        int signo = ::sigwaitinfo(&sigset, nullptr);
        if(-1 == signo)
            std::abort();

        std::cout << "Received signal " << signo << '\n';
        testClass.shutDown();
    });

    signal_thread.join();
}
0 голосов
/ 09 октября 2018

На вашей платформе этот обработчик сигнала вызывается при поступлении реального сигнала SIGINT. Список функций, которые можно вызывать внутри этого обработчика сигнала , довольно ограничен, и вызов чего-либо еще приводит к неопределенному поведению.

...