Boost :: beast: множественные вызовы async_write вызывают ошибку подтверждения - PullRequest
0 голосов
/ 06 июня 2018

Я пишу тесты для своего полнодуплексного сервера, и когда я делаю несколько (последовательных) вызовов async_write (хотя и покрыт пряди), я получаю следующую ошибку подтверждения от boost::beast в файле boost/beast/websocket/detail/stream_base.hpp:

// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);

Чтобы воспроизвести проблему на вашем компьютере: Полный код клиента, который воспроизводит эту проблему (MCVE), можно найти здесь .Он не работает по ссылке, потому что вам нужен сервер (на вашем собственном компьютере, извините, так как это невозможно сделать удобно в сети, и лучше объективно показать, что проблема в клиенте, а не на сервере, еслиЯ включаю это здесь).Я использовал websocketd , чтобы создать сервер с командой ./websocketd --ssl --sslkey /path/to/server.key --sslcert /path/to/server.crt --port=8085 ./prog.py, где ./prog.py - это простая программа на Python, которая печатает и сбрасывает (я получил ее с домашней страницы websocketd ).

Вызов, который выполняет запись в клиенте, выглядит следующим образом:

  std::vector<std::vector<std::future<void>>> clients_write_futures(
      clients_count);
  for (int i = 0; i < clients_count; i++) {
    clients_write_futures[i] = std::vector<std::future<void>>(num_of_messages);
    for (int j = 0; j < num_of_messages; j++) {
      clients_write_futures[i][j] =
          clients[i]->write_data_async_future("Hello"); // writing here
    }
  }

Обратите внимание, что в этом примере я использую только 1 клиента.Массив клиентов - это просто обобщение для большей нагрузки на сервер при тестировании.

Мои комментарии по проблеме:

  1. Цикл последовательный;это не так, как я делаю это в нескольких потоках
  2. Должна быть возможность осуществлять связь в полнодуплексной форме, когда на сервер отправляются сообщения с неопределенным числом сообщений.Как еще можно сделать полнодуплексную связь?
  3. Я использую пряди, чтобы обернуть свои асинхронные вызовы, чтобы предотвратить любое столкновение в сокете через io_service / io_context
  4. Исследование этого с помощью отладчика показываетчто вторая итерация цикла терпит неудачу последовательно, что означает, что я делаю что-то в корне неправильно, но я не знаю, что это такое.Другими словами: это, очевидно, детерминированная проблема.

Что я здесь не так делаю?Как я могу написать неопределенное количество сообщений на мой сервер веб-сокетов?


РЕДАКТИРОВАТЬ:

Sehe, я хочу начать с извинения за код беспорядок (не понимал, что это плохо), и благодарю вас за ваши усилия в этом.Я хотел бы, чтобы вы спросили меня, почему он структурирован таким (вероятно) организованным, а также хаотичным способом одновременно, и ответ прост: основной - это gtest-код, чтобы увидеть, работает ли мой универсальный универсальный клиент веб-сокетов, который я использую для подчеркивания.протестировать мой сервер (который использует тонны многопоточных объектов io_service, которые я считаю чувствительными и нуждающимися в широком тестировании).Я планирую бомбардировать мой сервер множеством клиентов одновременно во время реальных производственных испытаний.Я разместил этот вопрос, потому что поведение клиента я не понимаю.В этом файле я создал MCVE (который люди постоянно запрашивают на SO).Мне потребовалось два часа, чтобы лишить меня кода для его создания, и в конце концов я скопировал свой код фикстуры gtest test (который является фикстурой на сервере) и вставил его в основной файл и проверил, что проблема все еще существует на другом сервере, и удалил его.немного (что явно оказалось недостаточно).

Так почему я не ловлю исключения?Потому что gtest поймает их и посчитает, что тест не пройден.Основным является не производственный код, а клиент.Я многому научился из того, что вы упомянули, и я должен сказать, что глупо бросать и ловить, но я не знал о std :: make_exception_ptr (), поэтому я нашел свой (dumm) способ добиться того же результата: -).Почему слишком много бесполезных функций: они бесполезны здесь, в этом тесте / примере, но в целом я мог бы использовать их позже для других целей, поскольку этот клиент не только для этого случая.

Теперь вернемся к проблеме: что-то, чего я не понимаю, это то, почему мы должны покрывать async_write strand_, когда он используется последовательно в цикле в основном потоке (я неправильно выразил, что покрыл только обработчик).Я бы понял, почему обработчик покрыт, потому что сокет не является поточно-ориентированным, и многопоточная io_service создаст там гонку.Мы также знаем, что io_service::post сам по себе является потокобезопасным (поэтому я подумал, что перенос async_write не нужен).Не могли бы вы объяснить, что при выполнении этого нам не нужно потокобезопасно, что нам нужно обернуть async_write?Я знаю, ты уже знаешь это, но тот же самый аргумент все еще стреляет.Мы упорядочили обработчик и асинхронную очередь, и клиент все еще не рад выполнению нескольких вызовов записи.Что еще может отсутствовать?

(Кстати, если вы напишите, затем получите будущее, затем прочитаете, затем снова напишите, это работает. Вот почему я использую фьючерсы, чтобы точно определить тестовые случаи и определитьпорядок моих тестов. Я здесь параноик.)

1 Ответ

0 голосов
/ 07 июня 2018

Ты сказал ты прикрываешь async_write пряди.Но вы не делаете такой вещи .Все, что вы можете увидеть, это завернуть обработчики завершения в эту цепочку .Но вы отправляете асинхронные операции напрямую .

Что еще хуже, вы делаете это из основного потока, в то время как в любом из потоков, связанных с вашими экземплярами WSClient, выполняются асинхронные операции, что означает, что вы одновременно обращаетесь к экземплярам объектов, которые непоточно-ориентированный.

Это гонка данных, поэтому вы получаете Неопределенное поведение .

Наивное исправление может быть:

std::future<void> write_data_async_future(const std::string &data) {
    // shared_ptr is used to ensure data's survival
    std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
    std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();

    post(strand_, [=,self=shared_from_this()] {
        websock.async_write(
            boost::asio::buffer(*data_ptr),
            boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, self,
                                                          std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                          write_promise)));
    });

    return write_promise->get_future();
}

Но это недостаточно .Теперь вы можете быть уверены, что ни одна из ваших асинхронных операций или их завершений не будет выполняться одновременно, но вы все равно можете опубликовать следующую асинхронную операцию до того, как будет вызван обработчик завершения для первой.

Исправитьчто вам просто нужно стоять в очереди.

Если честно, я не уверен, почему вы так много внимания уделяете синхронизации с использованием фьючерсов.Это только затрудняет достижение этого.Если вы можете описать, чего вы / функционально / хотите достичь, я могу предложить решение, которое, вероятно, будет намного короче.

Примечания к обзору кода

До того, как меня осенило, чтоКод был весь, я потратил довольно много времени на чтение вашего кода.Я не хотел бы отнимать у вас заметки, которые я сделал по пути.

Предупреждение: это было довольно длительное погружение в код.Я предоставляю это, потому что некоторые из идей могут помочь вам понять, как вам нужно реструктурировать ваш код.

Я начал читать цепочки асинхронного кода вплоть до on_handshake (что устанавливает значение started_promise.

Затем я направился к мальстрому, который является вашей main функцией.Ваша основная функция - 50 строк кода ?!Наличие нескольких параллельных контейнеров и повторяющихся вложенных циклов вручную через них?

Вот что я получил после некоторого рефакторинга:

int main() {
    std::vector<actor> actors(1);

    for (auto& a : actors) {
        a.client = std::make_shared<WSClient>();
        a.session_start_future = a.client->start("127.0.0.1", "8085");
        a.messages.resize(50);
    }

    for (auto& a : actors) { a.session_start_future.get(); }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.write_future = a.client->write_data_async_future("Hello");
    } }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.read_future = a.client->read_data_async_future();
    } }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.write_future.get();
        std::string result = m.read_future.get();
    } }
}

Все структуры данных были свернуты в небольшой помощник actor:

struct actor {
    std::shared_ptr<WSClient> client;
    std::future<void> session_start_future;

    struct message {
        std::string message = GenerateRandomString(20);
        std::future<void> write_future;
        std::future<std::string> read_future;
    };

    std::vector<message> messages;
};

Сейчас у нас примерно час пересмотра кода, без выгоды, за исключением того, что теперь мы можем СКАЗАТЬ, что делает main, и иметь некоторую уверенность в том, что нетЭто не простая ошибка с переменной цикла или что-то в этом роде.

Сбор данных

В начале записи: write_data_async_future.Подождите.Там также write_data_async и write_data_sync.Зачем?Вы захотите прочитать

Что еще хуже, WSClient только ретранслирует их на предполагаемый одиночный сеанс.Почему на данный момент существует различие между WSClient и WSClientSession?Я говорю, что его нет.

Испарение 30 строк не очень полезного кода, в дальнейшем у нас все тот же сбой, так что это хорошо.

Где мы были.write_data_async_future.Ах да, нам нужны не будущие версии?Нет. Итак, осталось еще 40 строк кода.

Теперь по-настоящему: write_data_async_future:

std::future<void> write_data_async_future(const std::string &data) {
    // shared_ptr is used to ensure data's survival
    std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
    std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
    websock.async_write(
        boost::asio::buffer(*data_ptr),
        boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, shared_from_this(),
                                                      std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                      write_promise)));
    return write_promise->get_future();
}

Выглядит ... ладно.Подождите, есть on_write_future?Это, вероятно, означает, что нам нужно испарить больше строк неиспользуемого кода.Смотря ... Да.Пуф, ушел.

Теперь diffstat выглядит следующим образом:

  test.cpp | 683 +++++++++++++++++++++++----------------------------------------
  1 file changed, 249 insertions(+), 434 deletions(-)

Вернемся к этой функции, поэтому давайте посмотрим на on_write_future:

void on_write_future(boost::system::error_code ec, std::size_t bytes_transferred,
                     std::shared_ptr<std::string> data_posted,
                     std::shared_ptr<std::promise<void> > write_promise) {
    boost::ignore_unused(bytes_transferred);
    boost::ignore_unused(data_posted);

    if (ec) {
        try {
            throw std::runtime_error("Error thrown while performing async write: " + ec.message());
        } catch (...) {
            write_promise->set_exception(std::current_exception());
        }
        return;
    }
    write_promise->set_value();
}

Несколько вопросов.Все пройденное игнорируется.Я знаю, для чего вы передаете shared_ptrs, но, возможно, вам следует передавать их как часть объекта операции, чтобы избежать наличия такого количества отдельных shared-ptrs.

Бросать исключение только для его перехвата?Ммм.Я не уверен в этом.Возможно, просто установите новое исключение:

if (ec) {
    write_promise->set_exception(
            std::make_exception_ptr(std::system_error(ec, "async write failed")));
} else {
    write_promise->set_value();
}

Несмотря на это, сейчас есть концептуальная проблема.То, как вы свободно используете get() без захвата main, означает, что любая ошибка в любом соединении просто прервет все операции.Было бы весьма полезно иметь ошибку, просто прервав одно соединение / сеанс / клиент.Которые в вашем коде довольно синонимичны (а также с io_context и thread).

Sidenote: Вы сохраняете поток как член, но всегда отсоединяете его.Это означает, что член с тех пор бесполезен.

В этот момент я взял перерыв в рассмотрении, и, как это случилось, я получил мозговую волну, которая показала мне проблему.Полупеченные результаты моих упражнений здесь .Обратите внимание, что вы не можете использовать его, потому что это на самом деле не решает проблему.Но это может помочь другими способами?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...