Во-первых, спасибо за отличную инфраструктуру nio - Netty.
Наше приложение включает в себя клиентскую и серверную части, они взаимодействуют через сокетные соединения и отправляют сообщения protobuf.И клиент, и сервер написаны с использованием OIO.Я перемещаю настоящий код в nio на основе Netty Framework.В настоящее время у меня проблема с реализацией сервера: я не нашел, как обработать обработчик, используя рабочий поток netty.
Существует упрощенный код:
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
Runtime.getRuntime().availableProcessors()
)
);
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
final ChannelPipeline pipeline = pipeline();
// decoder
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(LogServerProto.Request.getDefaultInstance()));
// Encoder
pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
// handler
pipeline.addLast("handler", new SimpleChannelUpstreamHandler() {
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent event) throws Exception {
logger.info("handleClientSocket in thread " + Thread.currentThread().getName());
final MessageLite request = (MessageLite) event.getMessage();
final MessageLite batch = storage.readBatch(request);
if (batch != null) {
event.getChannel().write(batch);
} else {
storage.registerListener(new Storage.StorageListener() {
@Override
public void onSaveBatch() {
storage.removeListener(this);
logger.info("Send upstream in thread " + Thread.currentThread().getName());
event.getChannel().getPipeline().sendUpstream(event);
}
});
}
}
});
return pipeline;
}
});
bootstrap.bind(new InetSocketAddress("localhost", 7000));
Сервер принимает запросы (Протобуф сообщения) от клиентов.Затем сервер запрашивает «хранилище» для «пакета».Хранилище возвращает объект или ноль, если в данный момент пакет недоступен.Если пакет не нулевой - все работает хорошо, но если пакет недоступен, сервер должен вернуть его клиенту, когда пакет будет добавлен в хранилище.Поэтому я регистрирую StorageListener, который вызывается при добавлении пакета в хранилище (примечание: пакеты добавляются в хранилище из отдельного потока 'storage-writing-thread').StorageListener отправляет апстрим, используя старый объект 'request', конвейер обрабатывает его хорошо, и мой "обработчик" записывает ответ (пакет) в канал.Похоже, все в порядке, но все эти задачи обрабатываются в потоке storage-writing-thread.Как я могу обновить слушатель для обработки моего «обработчика» в рабочем потоке netty в этом случае?