Я новичок в Netty У меня есть клиентское приложение TCP, разработанное с Netty.Когда я использую future, получаю асинхронный ответ от сервера, возвращается какой-то ответ, но в будущем тайм-аут не завершается.Класс TCPClient, подобный следующему;
public TcpClient {
public boolean connect(Host host) {
try {
Bootstrap clientBootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 50)
.remoteAddress(new InetSocketAddress(host.getIp(), host.getPort()))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
socketChannel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2146));
FalconClientHandler falconClientHandler = new FalconClientHandler(host);
host.setFalconClientHandler(falconClientHandler);
socketChannel.pipeline().addLast(falconClientHandler);
}
});
channelFuture = clientBootstrap.connect().sync(); //BAŞARI İLE BAĞLANDI
channelFuture.channel().closeFuture().sync();
return host.isActive();
} catch (Exception e) {
log.info("Connection timed out --> " + e);
host.setActive(false);
return false;
} finally {
host.setActive(false);
}
}
public synchronized ResponseFuture send(long transactionId,String message) {
final Map<Long,ResponseFuture> responseFuture = new ConcurrentHashMap<>();
responseFuture.put(transactionId,new ResponseFuture());
if (!hostSelector.getUpHostList().isEmpty()) {
int hostCount = hostSelector.getUpHostList().size();
Host host;
host = hostSelector.getUpHostList().get(index.incrementAndGet() % hostCount);
if (host.isActive()) {
int headerLength = Integer.parseInt(message.substring(8, 12));
log.info("[{}] Host {} Tcp Request",message.substring(52, 52 + headerLength),host.getIp());
channelFuture.addListener((GenericFutureListener<ChannelFuture>) future -> {
log.info("[{}] Tcp request added to map",transactionId);
channelFuture.channel().pipeline().get(FalconClientHandler.class).setResponseFuture(responseFuture);
byte[] byteBuffer = message.getBytes();
channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(byteBuffer));
});
}
} else {
log.error("AYAKTA HOST YOK");
}
return responseFuture.get(transactionId);
}
}
У метода отправки есть транзакция ID и сообщение запроса. Когда я отправляю это сообщение с идентификатором транзакции, ответ возвращается с этим идентификатором транзакции.Я называю эту отправку следующим образом:
ResponseFuture responseFuture = falconClient.send(Long.valueOf(transactionId), finalMessage);
try {
Object obj = responseFuture.get(ddaTimeoutParam, TimeUnit.MILLISECONDS);
if(obj!=null) {
response = obj.toString();
ddaDelta = System.currentTimeMillis()-ddaRequestStartTime;
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("[{}] DDA timeout. Timeout parameter: {}",transactionId,ddaTimeoutParam);
responseFuture.cancel(true);
response = "TIMEOUT";
ddaDelta = System.currentTimeMillis()-ddaRequestStartTime;
}
Response future - это базовый класс реализации Future.Положите и получите такие методы;
public class ResponseFuture implements Future<String> {
private volatile State state = State.WAITING;
ArrayBlockingQueue<String> blockingResponse = new ArrayBlockingQueue<String>(1);
private enum State {
WAITING,
DONE
}
@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
final String responseAfterWait = blockingResponse.poll(timeout, unit);
if (responseAfterWait == null) {
throw new TimeoutException();
}
return responseAfterWait;
}
public void set(String msg) {
if (state == State.DONE) {
return;
}
try {
blockingResponse.put(msg);
state = State.DONE;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
My Handler class for receive server response message like following;
public class FalconClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private ChannelHandlerContext ctx;
private Map<Long,ResponseFuture> responseFuture;
public synchronized void setResponseFuture(Map<Long,ResponseFuture> responseFuture) {
log.info("{} ResponseFuture setted",responseFuture.keySet());
this.responseFuture = responseFuture;
}
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) {
String input = in.toString(CharsetUtil.UTF_8);
String transactionKey = input.substring(52, 66).trim();
if(responseFuture.get(Long.valueOf(transactionKey))!=null)
responseFuture.get(Long.valueOf(transactionKey)).set(input);
else
log.info("[{}] Tcp Response map is empty",transactionKey);
}
}
Когда я запускаю этот код при высокой нагрузке, например, 30 транзакций в секунду, tcp-ответ возвращается с netty-сервера, но в будущем метод get получает тайм-аут. Эта ситуация не возникает при каждом запросепример% 20 запрос не выполняется, если запрос 30% в секунду% 50 завершается неудачно в течение 40% в секунду.Что может происходить под нагрузкой?