asio :: async_write невероятно сложно синхронизировать в большом потоке - PullRequest
5 голосов
/ 21 мая 2019

В настоящее время я использую библиотеку Asio C ++ и написал оболочку для клиента.Мой оригинальный подход был очень базовым и требовал только потока в одном направлении.Требования изменились, и я переключился на использование всех асинхронных вызовов.Большая часть миграции была легкой, за исключением asio::async_write(...).Я использовал несколько разных подходов и неизбежно зашел в тупик с каждым из них.

Приложение непрерывно передает данные в большом объеме.Я держался подальше от нитей, потому что они не блокируют и могут привести к проблемам с памятью, особенно когда сервер находится под большой нагрузкой.Задания будут резервироваться, а куча приложений бесконечно возрастает.

Поэтому я создал блокирующую очередь только для того, чтобы выяснить, каким сложным образом использование блокировок для обратных вызовов и / или блокирование событий приводит к неизвестному поведению.

Оболочка - это очень большой класс, поэтому я постараюсь объяснить свой ландшафт в его текущем состоянии и, надеюсь, получить несколько хороших советов:

  • У меня есть asio::steady_timer, который работает по фиксированному расписанию до отправка сообщения пульса непосредственно в очередь блокировки.
  • поток, предназначенный для чтения событий, и отправка их в очередь блокировки
  • поток, выделенный для потребление очереди блокировки

Например, в моей очереди у меня есть queue::block() и queue::unblock(), которые являются просто оболочками для условной переменной / мьютекса.

std::thread consumer([this]() {
    std::string message_buffer;

    while (queue.pop(message_buffer)) {
        queue.stage_block();
        asio::async_write(*socket, asio::buffer(message_buffer), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
        queue.block();

    }
});

void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
    queue.unblock();
}

Когда сокет выполняет резервное копирование и сервер больше не может принимать данные из-за текущей загрузки, очередь заполняется и приводит к тупику, при котором handle_write(...) никогда не вызываетсяed.

Другой подход полностью исключает потребительский поток и опирается на handle_write(...) для выталкивания очереди.Вот так:

void networking::write(const std::string& data) {
    if (!queue.closed()) {
        std::stringstream stream_buffer;
        stream_buffer << data << std::endl;

        spdlog::get("console")->debug("pushing to queue {}", queue.size());

        queue.push(stream_buffer.str());

        if (queue.size() == 1) {
            spdlog::get("console")->debug("handle_write: {}", stream_buffer.str());
            asio::async_write(*socket, asio::buffer(stream_buffer.str()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
        }
    }
}

void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
    std::string message;
    queue.pop(message);

    if (!queue.closed() && !queue.empty()) {
        std::string front = queue.front();

        asio::async_write(*socket, asio::buffer(queue.front()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
    }
}

Это также привело к тупику и, очевидно, к другим гоночным проблемам.Когда я отключил свой обратный вызов сердцебиения, у меня не было абсолютно никаких проблем.Тем не менее, сердцебиение является обязательным требованием.

Что я делаю не так?Какой подход лучше?

1 Ответ

0 голосов
/ 21 мая 2019

Кажется, вся моя боль полностью связана с сердцебиением.Отключение сердцебиения в каждой вариации моих операций асинхронной записи, кажется, излечивает мои проблемы, поэтому это заставляет меня поверить, что это может быть результатом использования встроенных asio::async_wait(...) и asio::steady_timer.

Asio.внутренне синхронизирует свою работу и ожидает выполнения заданий перед выполнением следующего задания.Использование asio::async_wait(...) для создания моей функции сердцебиения было моей конструкторской ошибкой, поскольку она работала в том же потоке, который ожидал ожидающих заданий.Это создало тупик с Асио, когда сердцебиение ожидало на queue::push(...).Это объясняет, почему обработчик завершения asio::async_write(...) никогда не выполнялся в моем первом примере.

Решение состояло в том, чтобы поместить биение в его собственный поток и позволить ему работать независимо от Asio.Я все еще использую свою очередь блокировки для синхронизации вызовов на asio::async_write(...), но изменил свой потребительский поток для использования std::future и std::promise.Это синхронизирует обратный вызов с моим потребительским потоком.

std::thread networking::heartbeat_worker() {
    return std::thread([&]() {
        while (socket_opened) {

            spdlog::get("console")->trace("heartbeat pending");
            write(heartbeat_message);
            spdlog::get("console")->trace("heartbeat sent");

            std::unique_lock<std::mutex> lock(mutex);
            socket_closed_event.wait_for(lock, std::chrono::milliseconds(heartbeat_interval), [&]() {
                return !socket_opened;
            });
        }

        spdlog::get("console")->trace("heartbeat thread exited gracefully");
    });
}
...