Чтение из одного сокета для нескольких потребителей асинхронно в одном потоке - PullRequest
1 голос
/ 07 июня 2019

Я реализую мультиплексор соединений - класс, который оборачивает одно соединение, чтобы обеспечить возможность создания над ним так называемых Stream.Таких потоков может быть несколько по одному физическому соединению.

Сообщения, отправляемые по этому соединению, определяются протоколом и могут быть служебными (контроль перегрузки и т. Д.), Которые никогда не видны клиентам, и данными - они содержат некоторые данные для потоков, для которыхone - определено в заголовке соответствующего сообщения.

Я столкнулся с проблемой при реализации метода read для Stream.Он должен быть блокирующим, но асинхронным, чтобы он возвращал какое-то значение - чтение данных или произошла ошибка - но сам запрос должен быть какой-то асинхронной очередью.

Для реализации асинхронного сетевого ввода-вывода мы использовали Boost's async_read -s, async_write -s и т. Д. С токеном завершения, взятым из другой библиотеки.Итак, вызов MyConnection::underlying_connection::read(size_t) является асинхронным уже в терминах, которые я описал ранее.

Одним из реализованных мною решений является функция MyConnection::processFrame(), которая считывает из соединения, обрабатывает сообщение и, если этосообщение данных, помещает данные в буфер соответствующего потока.Функция должна вызываться в цикле while потоком read.Но в этом случае может быть более одного одновременного вызова на async_read, который является UB.Кроме того, это будет означать, что даже служебные сообщения должны ждать, пока некоторый поток захочет прочитать данные, что также не подходит.

Другое решение, которое я придумал, - это использование future -s, но, как япроверил, что их методы wait/get будут блокировать весь поток (даже с отклоненной политикой или парным обещанием), чего тоже следует избегать.

Ниже приведен упрощенный пример, содержащий только методы, которые необходимы для понимания вопроса.,Это текущая реализация, которая содержит ошибки.

struct LowLevelConnection {
  /// completion token of 3-rd part library - ufibers
  yield_t yield;

  /// boost::asio socket
  TcpSocket socket_;

  /// completely async (in one thread) method
  std::vector<uint8_t> read(size_t bytes) {
    std::vector<uint8_t> res;
    res.reserve(bytes);

    boost::asio::async_read(socket_, res, yield);

    return res;
  }
}

struct MyConnection {
  /// header is always of that length
  constexpr uint32_t kHeaderSize = 12; 

  /// underlying connection
  LowLevelConnection connection_;

  /// is running all the time the connection is up
  void readLoop() {
    while (connection_.isActive()) {
      auto msg = connection_.read(kHeaderSize);
      if (msg.type == SERVICE) { handleService(msg); return; }

      // this is data message; read another part of it
      auto data = connection_.read(msg.data_size);

      // put the data into the stream's buffer
      streams_.find(data.stream_id).buffer.put(data);
    }
  }
}

struct Stream {
  Buffer buffer;

  // also async blocking method
  std::vector<uint8_t> read(uint32_t bytes) {
    // in perfect scenario, this should look like this
    async_wait([]() { return buffer.size() >= bytes; });

    // return the subbuffer of 'bytes' size and remove them
    return subbufer...
  }
}

Спасибо за будущие ответы!

...