Netty ChannelFuture timeout, когда ответ получен - PullRequest
1 голос
/ 04 апреля 2019

Я новичок в Netty У меня есть клиентское приложение TCP, разработанное с Netty.Когда я использую future, получаю асинхронный ответ от сервера, возвращается какой-то ответ, но в будущем тайм-аут не завершается.Класс TCPClient, подобный следующему;

public TcpClient {
    public boolean connect(Host host) {
        try {
            Bootstrap clientBootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 50)
                    .remoteAddress(new InetSocketAddress(host.getIp(), host.getPort()))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) {
                            socketChannel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2146));
                            FalconClientHandler falconClientHandler = new FalconClientHandler(host);
                            host.setFalconClientHandler(falconClientHandler);
                            socketChannel.pipeline().addLast(falconClientHandler);
                        }
                    });

            channelFuture = clientBootstrap.connect().sync(); //BAŞARI İLE BAĞLANDI

            channelFuture.channel().closeFuture().sync();

            return host.isActive();
        } catch (Exception e) {
            log.info("Connection timed out --> " + e);
            host.setActive(false);
            return false;
        } finally {
            host.setActive(false);
        }
    }

public synchronized ResponseFuture send(long transactionId,String message) {
    final Map<Long,ResponseFuture> responseFuture = new ConcurrentHashMap<>();
    responseFuture.put(transactionId,new ResponseFuture());
    if (!hostSelector.getUpHostList().isEmpty()) {
        int hostCount = hostSelector.getUpHostList().size();
        Host host;

        host = hostSelector.getUpHostList().get(index.incrementAndGet() % hostCount);
        if (host.isActive()) {
            int headerLength = Integer.parseInt(message.substring(8, 12));
            log.info("[{}] Host {} Tcp Request",message.substring(52, 52 + headerLength),host.getIp());
            channelFuture.addListener((GenericFutureListener<ChannelFuture>) future -> {
                log.info("[{}] Tcp request added to map",transactionId);
                channelFuture.channel().pipeline().get(FalconClientHandler.class).setResponseFuture(responseFuture);
                byte[] byteBuffer = message.getBytes();
                channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(byteBuffer));
            });
        }
    } else {
        log.error("AYAKTA HOST YOK");
    }
    return responseFuture.get(transactionId);
}

}

У метода отправки есть транзакция ID и сообщение запроса. Когда я отправляю это сообщение с идентификатором транзакции, ответ возвращается с этим идентификатором транзакции.Я называю эту отправку следующим образом:

    ResponseFuture responseFuture = falconClient.send(Long.valueOf(transactionId), finalMessage);
    try {
        Object obj = responseFuture.get(ddaTimeoutParam, TimeUnit.MILLISECONDS);
        if(obj!=null) {
            response = obj.toString();
            ddaDelta = System.currentTimeMillis()-ddaRequestStartTime;
        }
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        log.warn("[{}] DDA timeout. Timeout parameter: {}",transactionId,ddaTimeoutParam);
        responseFuture.cancel(true);
        response = "TIMEOUT";
        ddaDelta = System.currentTimeMillis()-ddaRequestStartTime;
    } 

Response future - это базовый класс реализации Future.Положите и получите такие методы;

public class ResponseFuture implements Future<String> {

    private volatile State state = State.WAITING;
    ArrayBlockingQueue<String> blockingResponse = new ArrayBlockingQueue<String>(1);

    private enum State {
        WAITING,
        DONE
    }

    @Override
    public String get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        final String responseAfterWait = blockingResponse.poll(timeout, unit);
        if (responseAfterWait == null) {
            throw new TimeoutException();
        }
        return responseAfterWait;
    }

    public void set(String msg) {
        if (state == State.DONE) {
            return;
        }

        try {
            blockingResponse.put(msg);
            state = State.DONE;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


    My Handler class for receive server response message like following;


public class FalconClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private ChannelHandlerContext ctx;

    private Map<Long,ResponseFuture> responseFuture;

    public synchronized void setResponseFuture(Map<Long,ResponseFuture> responseFuture) {
        log.info("{} ResponseFuture setted",responseFuture.keySet());
        this.responseFuture = responseFuture;
    }

    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) {
        String input = in.toString(CharsetUtil.UTF_8);
        String transactionKey = input.substring(52, 66).trim();
        if(responseFuture.get(Long.valueOf(transactionKey))!=null)
            responseFuture.get(Long.valueOf(transactionKey)).set(input);
        else
            log.info("[{}] Tcp Response map is empty",transactionKey);
        }
}

Когда я запускаю этот код при высокой нагрузке, например, 30 транзакций в секунду, tcp-ответ возвращается с netty-сервера, но в будущем метод get получает тайм-аут. Эта ситуация не возникает при каждом запросепример% 20 запрос не выполняется, если запрос 30% в секунду% 50 завершается неудачно в течение 40% в секунду.Что может происходить под нагрузкой?

...