Нетти: как асинхронно ждать, когда на сервере нет данных для возврата? - PullRequest
0 голосов
/ 09 января 2012

Во-первых, спасибо за отличную инфраструктуру nio - Netty.

Наше приложение включает в себя клиентскую и серверную части, они взаимодействуют через сокетные соединения и отправляют сообщения protobuf.И клиент, и сервер написаны с использованием OIO.Я перемещаю настоящий код в nio на основе Netty Framework.В настоящее время у меня проблема с реализацией сервера: я не нашел, как обработать обработчик, используя рабочий поток netty.

Существует упрощенный код:

ServerBootstrap bootstrap = new ServerBootstrap(
  new NioServerSocketChannelFactory(
    Executors.newCachedThreadPool(),
    Executors.newCachedThreadPool(),
    Runtime.getRuntime().availableProcessors()
  )
);

// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  public ChannelPipeline getPipeline() throws Exception {
    final ChannelPipeline pipeline = pipeline();
    // decoder
    pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
    pipeline.addLast("protobufDecoder", new ProtobufDecoder(LogServerProto.Request.getDefaultInstance()));
    // Encoder
    pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
    pipeline.addLast("protobufEncoder", new ProtobufEncoder());
    // handler
    pipeline.addLast("handler", new SimpleChannelUpstreamHandler() {
      @Override
      public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent event) throws Exception {
        logger.info("handleClientSocket in thread " + Thread.currentThread().getName());
        final MessageLite request = (MessageLite) event.getMessage();
        final MessageLite batch = storage.readBatch(request);
        if (batch != null) {
          event.getChannel().write(batch);
        } else {
          storage.registerListener(new Storage.StorageListener() {
            @Override
            public void onSaveBatch() {
              storage.removeListener(this);
              logger.info("Send upstream in thread " + Thread.currentThread().getName());
              event.getChannel().getPipeline().sendUpstream(event);
            }
          });
        }
      }
    });
    return pipeline;
  }
});

bootstrap.bind(new InetSocketAddress("localhost", 7000));

Сервер принимает запросы (Протобуф сообщения) от клиентов.Затем сервер запрашивает «хранилище» для «пакета».Хранилище возвращает объект или ноль, если в данный момент пакет недоступен.Если пакет не нулевой - все работает хорошо, но если пакет недоступен, сервер должен вернуть его клиенту, когда пакет будет добавлен в хранилище.Поэтому я регистрирую StorageListener, который вызывается при добавлении пакета в хранилище (примечание: пакеты добавляются в хранилище из отдельного потока 'storage-writing-thread').StorageListener отправляет апстрим, используя старый объект 'request', конвейер обрабатывает его хорошо, и мой "обработчик" записывает ответ (пакет) в канал.Похоже, все в порядке, но все эти задачи обрабатываются в потоке storage-writing-thread.Как я могу обновить слушатель для обработки моего «обработчика» в рабочем потоке netty в этом случае?

1 Ответ

0 голосов
/ 23 июля 2014

Ваш вопрос не устранен, если я правильно понял, вы ищете способ вернуть результат, когда он будет доступен, для этого вам понадобится поток, который работает до тех пор, пока данные не станут доступны и выдаютвернуть результаты.В вашем случае класс хранения может реализовать поток.

Вам не следует ждать в рабочем потоке и не нужно делать ответ в этом потоке.Для записи чего-либо с сервера на клиент вам понадобится доступ к каналу клиента.Поэтому все, что вам нужно, это передать канал в ваш поток в качестве параметра и отправить результат из этого потока.

...