Ты сказал ты прикрываешь async_write
пряди.Но вы не делаете такой вещи .Все, что вы можете увидеть, это завернуть обработчики завершения в эту цепочку .Но вы отправляете асинхронные операции напрямую .
Что еще хуже, вы делаете это из основного потока, в то время как в любом из потоков, связанных с вашими экземплярами WSClient
, выполняются асинхронные операции, что означает, что вы одновременно обращаетесь к экземплярам объектов, которые непоточно-ориентированный.
Это гонка данных, поэтому вы получаете Неопределенное поведение .
Наивное исправление может быть:
std::future<void> write_data_async_future(const std::string &data) {
// shared_ptr is used to ensure data's survival
std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
post(strand_, [=,self=shared_from_this()] {
websock.async_write(
boost::asio::buffer(*data_ptr),
boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, self,
std::placeholders::_1, std::placeholders::_2, data_ptr,
write_promise)));
});
return write_promise->get_future();
}
Но это недостаточно .Теперь вы можете быть уверены, что ни одна из ваших асинхронных операций или их завершений не будет выполняться одновременно, но вы все равно можете опубликовать следующую асинхронную операцию до того, как будет вызван обработчик завершения для первой.
Исправитьчто вам просто нужно стоять в очереди.
Если честно, я не уверен, почему вы так много внимания уделяете синхронизации с использованием фьючерсов.Это только затрудняет достижение этого.Если вы можете описать, чего вы / функционально / хотите достичь, я могу предложить решение, которое, вероятно, будет намного короче.
Примечания к обзору кода
До того, как меня осенило, чтоКод был весь, я потратил довольно много времени на чтение вашего кода.Я не хотел бы отнимать у вас заметки, которые я сделал по пути.
Предупреждение: это было довольно длительное погружение в код.Я предоставляю это, потому что некоторые из идей могут помочь вам понять, как вам нужно реструктурировать ваш код.
Я начал читать цепочки асинхронного кода вплоть до on_handshake
(что устанавливает значение started_promise
.
Затем я направился к мальстрому, который является вашей main
функцией.Ваша основная функция - 50 строк кода ?!Наличие нескольких параллельных контейнеров и повторяющихся вложенных циклов вручную через них?
Вот что я получил после некоторого рефакторинга:
int main() {
std::vector<actor> actors(1);
for (auto& a : actors) {
a.client = std::make_shared<WSClient>();
a.session_start_future = a.client->start("127.0.0.1", "8085");
a.messages.resize(50);
}
for (auto& a : actors) { a.session_start_future.get(); }
for (auto& a : actors) { for (auto& m : a.messages) {
m.write_future = a.client->write_data_async_future("Hello");
} }
for (auto& a : actors) { for (auto& m : a.messages) {
m.read_future = a.client->read_data_async_future();
} }
for (auto& a : actors) { for (auto& m : a.messages) {
m.write_future.get();
std::string result = m.read_future.get();
} }
}
Все структуры данных были свернуты в небольшой помощник actor
:
struct actor {
std::shared_ptr<WSClient> client;
std::future<void> session_start_future;
struct message {
std::string message = GenerateRandomString(20);
std::future<void> write_future;
std::future<std::string> read_future;
};
std::vector<message> messages;
};
Сейчас у нас примерно час пересмотра кода, без выгоды, за исключением того, что теперь мы можем СКАЗАТЬ, что делает main
, и иметь некоторую уверенность в том, что нетЭто не простая ошибка с переменной цикла или что-то в этом роде.
Сбор данных
В начале записи: write_data_async_future
.Подождите.Там также write_data_async
и write_data_sync
.Зачем?Вы захотите прочитать
Что еще хуже, WSClient
только ретранслирует их на предполагаемый одиночный сеанс.Почему на данный момент существует различие между WSClient
и WSClientSession
?Я говорю, что его нет.
Испарение 30 строк не очень полезного кода, в дальнейшем у нас все тот же сбой, так что это хорошо.
Где мы были.write_data_async_future
.Ах да, нам нужны не будущие версии?Нет. Итак, осталось еще 40 строк кода.
Теперь по-настоящему: write_data_async_future
:
std::future<void> write_data_async_future(const std::string &data) {
// shared_ptr is used to ensure data's survival
std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
websock.async_write(
boost::asio::buffer(*data_ptr),
boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, data_ptr,
write_promise)));
return write_promise->get_future();
}
Выглядит ... ладно.Подождите, есть on_write_future
?Это, вероятно, означает, что нам нужно испарить больше строк неиспользуемого кода.Смотря ... Да.Пуф, ушел.
Теперь diffstat выглядит следующим образом:
test.cpp | 683 +++++++++++++++++++++++----------------------------------------
1 file changed, 249 insertions(+), 434 deletions(-)
Вернемся к этой функции, поэтому давайте посмотрим на on_write_future
:
void on_write_future(boost::system::error_code ec, std::size_t bytes_transferred,
std::shared_ptr<std::string> data_posted,
std::shared_ptr<std::promise<void> > write_promise) {
boost::ignore_unused(bytes_transferred);
boost::ignore_unused(data_posted);
if (ec) {
try {
throw std::runtime_error("Error thrown while performing async write: " + ec.message());
} catch (...) {
write_promise->set_exception(std::current_exception());
}
return;
}
write_promise->set_value();
}
Несколько вопросов.Все пройденное игнорируется.Я знаю, для чего вы передаете shared_ptrs, но, возможно, вам следует передавать их как часть объекта операции, чтобы избежать наличия такого количества отдельных shared-ptrs.
Бросать исключение только для его перехвата?Ммм.Я не уверен в этом.Возможно, просто установите новое исключение:
if (ec) {
write_promise->set_exception(
std::make_exception_ptr(std::system_error(ec, "async write failed")));
} else {
write_promise->set_value();
}
Несмотря на это, сейчас есть концептуальная проблема.То, как вы свободно используете get()
без захвата main
, означает, что любая ошибка в любом соединении просто прервет все операции.Было бы весьма полезно иметь ошибку, просто прервав одно соединение / сеанс / клиент.Которые в вашем коде довольно синонимичны (а также с io_context
и thread
).
Sidenote: Вы сохраняете поток как член, но всегда отсоединяете его.Это означает, что член с тех пор бесполезен.
В этот момент я взял перерыв в рассмотрении, и, как это случилось, я получил мозговую волну, которая показала мне проблему.Полупеченные результаты моих упражнений здесь .Обратите внимание, что вы не можете использовать его, потому что это на самом деле не решает проблему.Но это может помочь другими способами?