Я реализую мультиплексор соединений - класс, который оборачивает одно соединение, чтобы обеспечить возможность создания над ним так называемых Stream
.Таких потоков может быть несколько по одному физическому соединению.
Сообщения, отправляемые по этому соединению, определяются протоколом и могут быть служебными (контроль перегрузки и т. Д.), Которые никогда не видны клиентам, и данными - они содержат некоторые данные для потоков, для которыхone - определено в заголовке соответствующего сообщения.
Я столкнулся с проблемой при реализации метода read
для Stream
.Он должен быть блокирующим, но асинхронным, чтобы он возвращал какое-то значение - чтение данных или произошла ошибка - но сам запрос должен быть какой-то асинхронной очередью.
Для реализации асинхронного сетевого ввода-вывода мы использовали Boost's async_read
-s, async_write
-s и т. Д. С токеном завершения, взятым из другой библиотеки.Итак, вызов MyConnection::underlying_connection::read(size_t)
является асинхронным уже в терминах, которые я описал ранее.
Одним из реализованных мною решений является функция MyConnection::processFrame()
, которая считывает из соединения, обрабатывает сообщение и, если этосообщение данных, помещает данные в буфер соответствующего потока.Функция должна вызываться в цикле while
потоком read
.Но в этом случае может быть более одного одновременного вызова на async_read
, который является UB.Кроме того, это будет означать, что даже служебные сообщения должны ждать, пока некоторый поток захочет прочитать данные, что также не подходит.
Другое решение, которое я придумал, - это использование future
-s, но, как япроверил, что их методы wait/get
будут блокировать весь поток (даже с отклоненной политикой или парным обещанием), чего тоже следует избегать.
Ниже приведен упрощенный пример, содержащий только методы, которые необходимы для понимания вопроса.,Это текущая реализация, которая содержит ошибки.
struct LowLevelConnection {
/// completion token of 3-rd part library - ufibers
yield_t yield;
/// boost::asio socket
TcpSocket socket_;
/// completely async (in one thread) method
std::vector<uint8_t> read(size_t bytes) {
std::vector<uint8_t> res;
res.reserve(bytes);
boost::asio::async_read(socket_, res, yield);
return res;
}
}
struct MyConnection {
/// header is always of that length
constexpr uint32_t kHeaderSize = 12;
/// underlying connection
LowLevelConnection connection_;
/// is running all the time the connection is up
void readLoop() {
while (connection_.isActive()) {
auto msg = connection_.read(kHeaderSize);
if (msg.type == SERVICE) { handleService(msg); return; }
// this is data message; read another part of it
auto data = connection_.read(msg.data_size);
// put the data into the stream's buffer
streams_.find(data.stream_id).buffer.put(data);
}
}
}
struct Stream {
Buffer buffer;
// also async blocking method
std::vector<uint8_t> read(uint32_t bytes) {
// in perfect scenario, this should look like this
async_wait([]() { return buffer.size() >= bytes; });
// return the subbuffer of 'bytes' size and remove them
return subbufer...
}
}
Спасибо за будущие ответы!