Реализация сообщений поддержки активности в Netty с использованием WriteTimeoutHandler - PullRequest
11 голосов
/ 01 февраля 2012

Я использую Netty 3.2.7.Я пытаюсь записать функциональность в моем клиенте так, чтобы, если по истечении определенного времени (скажем, 30 секунд) не было написано ни одного сообщения, на сервер отправляется сообщение «keep-alive».

После некоторого временикопаясь, я обнаружил, что WriteTimeoutHandler должен позволить мне сделать это.Я нашел это объяснение здесь: https://issues.jboss.org/browse/NETTY-79.

Пример, приведенный в документации Netty:

public ChannelPipeline getPipeline() {
     // An example configuration that implements 30-second write timeout:
     return Channels.pipeline(
         new WriteTimeoutHandler(timer, 30), // timer must be shared.
         new MyHandler());
 }

В моем тестовом клиенте я сделал именно это.В MyHandler я также переопределил метод exceptionCaught ():

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    if (e.getCause() instanceof WriteTimeoutException) {
        log.info("Client sending keep alive!");
        ChannelBuffer keepAlive = ChannelBuffers.buffer(KEEP_ALIVE_MSG_STR.length());
        keepAlive.writeBytes(KEEP_ALIVE_MSG_STR.getBytes());
        Channels.write(ctx, Channels.future(e.getChannel()), keepAlive);
    }
}

Независимо от того, на какую длительность клиент ничего не записывает в канал, переопределенный метод exceptionCaught () никогда не вызывается.

Глядя на источник WriteTimeoutHandler, его реализация writeRequested () выглядит следующим образом:

public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
        throws Exception {

    long timeoutMillis = getTimeoutMillis(e);
    if (timeoutMillis > 0) {
        // Set timeout only when getTimeoutMillis() returns a positive value.
        ChannelFuture future = e.getFuture();
        final Timeout timeout = timer.newTimeout(
                new WriteTimeoutTask(ctx, future),
                timeoutMillis, TimeUnit.MILLISECONDS);

        future.addListener(new TimeoutCanceller(timeout));
    }

    super.writeRequested(ctx, e);
}

Здесь кажется, что эта реализация говорит: «Когда запрашивается запись, создайте новый тайм-аут. Когда записьуспешно, отмените таймаут. "

При использовании отладчика кажется, что это именно то, что происходит.Как только запись завершается, тайм-аут отменяется.Это не то поведение, которое я хочу.Мне нужно следующее поведение: «Если клиент не записывал какую-либо информацию в канал в течение 30 секунд, выдать исключение WriteTimeoutException».

Итак, не для этого ли предназначен WriteTimeoutHandler?Вот как я интерпретировал это из того, что я прочитал в Интернете, но реализация, похоже, не работает таким образом.Я использую это неправильно?Должен ли я использовать что-то еще?В нашей версии Mina того же клиента, который я пытаюсь переписать, я вижу, что метод sessionIdle () переопределяется для достижения желаемого поведения, но этот метод недоступен в Netty.

Ответы [ 2 ]

10 голосов
/ 20 апреля 2015

Для Netty 4.0 и новее вы должны расширить ChannelDuplexHandler , как в примере из документации IdleStateHandler :

 // An example that sends a ping message when there is no outbound traffic
 // for 30 seconds.  The connection is closed when there is no inbound traffic
 // for 60 seconds.

 public class MyChannelInitializer extends ChannelInitializer<Channel> {
     @Override
     public void initChannel(Channel channel) {
         channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
         channel.pipeline().addLast("myHandler", new MyHandler());
     }
 }

 // Handler should handle the IdleStateEvent triggered by IdleStateHandler.
 public class MyHandler extends ChannelDuplexHandler {
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
         if (evt instanceof IdleStateEvent) {
             IdleStateEvent e = (IdleStateEvent) evt;
             if (e.state() == IdleState.READER_IDLE) {
                 ctx.close();
             } else if (e.state() == IdleState.WRITER_IDLE) {
                 ctx.writeAndFlush(new PingMessage());
             }
         }
     }
 }
6 голосов
/ 01 февраля 2012

Я бы предложил добавить IdleStateHandler , а затем добавить пользовательскую реализацию IdleStateAwareUpstreamHandler , которая может реагировать на состояние ожидания.Для меня это очень хорошо работает во многих различных проектах.

В списке javadocs приведен следующий пример, который вы можете использовать в качестве основы своей реализации:

public class MyPipelineFactory implements ChannelPipelineFactory {

    private final Timer timer;
    private final ChannelHandler idleStateHandler;

    public MyPipelineFactory(Timer timer) {
        this.timer = timer;
        this.idleStateHandler = new IdleStateHandler(timer, 60, 30, 0);
        // timer must be shared.
    }

    public ChannelPipeline getPipeline() {
        return Channels.pipeline(
            idleStateHandler,
            new MyHandler());
    }
}

// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends IdleStateAwareChannelHandler {

    @Override
    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
        if (e.getState() == IdleState.READER_IDLE) {
            e.getChannel().close();
        } else if (e.getState() == IdleState.WRITER_IDLE) {
            e.getChannel().write(new PingMessage());
        }
    }
}

ServerBootstrap bootstrap = ...;
Timer timer = new HashedWheelTimer();
...
bootstrap.setPipelineFactory(new MyPipelineFactory(timer));
...
...