Netty WriteAndFlush периодически зависает - PullRequest
0 голосов
/ 19 июня 2019

Начальная загрузка Netty Server:

EventLoopGroup bg = new NioEventLoopGroup(1, new CamelThreadFactory("Thread#", "NettyServerTCPBoss", false));
EventLoopGroup wg = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, new CamelThreadFactory("Thread#", "NettyServerTCPWorker", false));
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);

Конфигурация Netty Pipeline:

    ch.pipeline().addLast("encoder", new StringEncoder());
    ch.pipeline().addLast("decoder", new RequestPayloadDecoder())); //This extends ByteToMessageDecoder
    ch.pipeline().addLast("idlehandler", new IdleStateHandler(..));
    ch.pipeline().addLast(new DefaultEventExecutorGroup(100),"handler", new 
org.apache.camel.component.netty4.handlers.ServerChannelHandler()); 

Установка AutoRead на false

public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress clientAddress = ((InetSocketAddress) ctx.channel().remoteAddress());
ctx.channel().config().setAutoRead(false);
ctx.channel().read();
}

Логика обработчика на channelRead0 ()

 public void process(ChannelHandlerContext nettyChannelContext) throws Exception {
    if(validate() && channel.isWritable()){
    outboundMessages = //get from database
    for(List<Outbound> outbound : outboundMessages){
    if(channel().isWritable()){
    ChannelFuture writeFuture = nettyChannelContext.channel().writeAndFlush(outbound.getString()); //Send the message
    writeFuture.sync(); // wait for the IO to complete
    logger.info("Updating message status for event row id {}",outbound.getRowId);
    updateMessageStatusInDb(outbound);
    } } }
    ctx.channel.read();//Call read() after writes are done to process next inbound message 
    }

Вопрос Кажется, что все работает нормально, но периодически оно застревает вwriteFuture.sync () более 10 минут.Я попытался использовать слушателя вместо синхронизации ().Но для получения обратного вызова operationComplete () требуется аналогичное время.
Примеры журналов:
2019-06-18 14: 16: 39.263 [Camel Thread # 20 - NettyEventExecutorGroup] INFO Обновление статуса сообщения для идентификатора строки события ::: 40257
2019-06-18 14: 16: 39.265 [Camel Thread # 20 - NettyEventExecutorGroup] INFO Обновление статуса сообщения для идентификатора строки события ::: 40263
2019-06-18 14: 16: 39.266 [Camel Thread# 20 - NettyEventExecutorGroup] INFO Обновление статуса сообщения для идентификатора строки события ::: 40269
2019-06-18 14: 30: 09.982 [Camel Thread # 20 - NettyEventExecutorGroup] INFO Обновление статуса сообщения для событияid строки ::: 40275
2019-06-18 14: 30: 09.987 [Camel Thread # 20 - NettyEventExecutorGroup] INFO Обновление статуса сообщения для идентификатора строки события ::: 42691

Почему происходит ввод-выводдо тех пор, пока на каждые 30 тыс. отправленных сообщений приходится 40-50 сообщений.Можете ли вы предоставить несколько указателей?

...