Я новичок в нетти, и мой английский не является родным, в первую очередь извините за эту ситуацию, я постараюсь объяснить как можно больше.
У меня есть приложение для подключения к серверу netty и отправки запроса tcp. Мы должны использовать этот сервер для блокирующих и неблокирующих операций, например;
1- Некоторые запросы будут выполняться асинхронно. Я отправлю запрос на сервер и не буду ждать ответа. В этом случае нет проблем
2- Второй случай синхронизации. Когда я отправляю запрос с идентификатором транзакции на сервер, мне приходится ждать ответа с тем же идентификатором транзакции. Я не нашел четкого примера блокировки запроса netty. Все они об обещании-будущем, и это реализовано как следующие строки, но это не потокобезопасно. Когда я использую с ~ 40tps, некоторые запросы не будут собраны с ответными сообщениями, некоторые сообщения экранируются. Например, запрошено 2400 сообщений за 1 мин. 2000 сообщений в порядке, но более или менее 400 сообщений сбежали. Я видел это 400 сообщение, возвращенное с сервера, но метод get будущего не получил его и получил время ожидания.
Конфигурация Netty Client;
Bootstrap clientBootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 50)
.remoteAddress(new InetSocketAddress(host.getIp(), host.getPort()))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
socketChannel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2146));
ClientHandler clientHandler = new clientHandler(host);
socketChannel.pipeline().addLast(clientHandler);
}
});
channelFuture = clientBootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
Канал будущего слушателя;
channelFuture.addListener((GenericFutureListener<ChannelFuture>) future -> {
channelFuture.channel().pipeline().get(ClientHandler.class).setResponseFuture(responseFuture);
byte[] byteBuffer = message.getBytes();
channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(byteBuffer));
});
ResponseFuture - это пользовательское будущее для получения ответа сервера. InboundHandler, как это;
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) {
String input = in.toString(CharsetUtil.UTF_8);
String transactionKey = input.substring(52, 66).trim();
if(responseFuture.get(Long.valueOf(transactionKey))!=null)
responseFuture.get(Long.valueOf(transactionKey)).set(input);
else
log.info("[{}] Tcp Response map is empty",transactionKey);
}
И, наконец, я получаю такое обещание на будущее;
Object obj = responseFuture.get(ddaTimeoutParam, TimeUnit.MILLISECONDS);
if(obj!=null) {
response = obj.toString();
ddaDelta = System.currentTimeMillis()-ddaRequestStartTime;
}
Иногда некоторые параллельные потоки работают, когда эта строка responseFuture.get(Long.valueOf(transactionKey)).set(input)
работает до responseFuture.get
и входы смешивают каждый идентификатор транзакции. Как я могу решить эту проблему? Могу ли я использовать другие хорошие методы Netty для отправки события, когда ответ получен.