Netty 4 - пул возвращает канал, который еще не готов отправить фактическое сообщение - PullRequest
0 голосов
/ 24 ноября 2018

Я создал обработчик входящих событий типа SimpleChannelInboundHandler и добавил его в конвейер.Мое намерение - каждый раз, когда устанавливается соединение, я хотел отправить сообщение приложения, называемое сообщением об открытии сеанса, и подготовить соединение для отправки фактического сообщения.Чтобы достичь этого, вышеуказанный обработчик входящих вызовов переходит к channelActive (), на который отправляется сообщение об открытии сеанса. В ответ на это я получаю сообщение с подтверждением открытия сеанса.Только после этого я смогу отправить любое количество реальных деловых сообщений.Я использую FixedChannelPool и инициализируется следующим образом.Это хорошо работает некоторое время при запуске.Но если удаленный хост закрывает соединение, после этого, если сообщение отправляется с использованием нижеуказанной функции sendMessage (), сообщение отправляется даже до сообщения об открытии сеанса через channelActive (), и его ответ получается.Таким образом, сервер игнорирует сообщение, так как сеанс еще не открыт, когда бизнес-сообщение было отправлено.

Я ищу, что пул должен возвращать только тот канал, который вызвал событие channelActive (), которое ужеотправил сообщение об открытии сеанса и получил от сервера подтверждение подтверждения открытия сеанса.Как справиться с этой ситуацией?

public class SessionHandler extends SimpleChannelInboundHandler<byte[]> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        if (ctx.channel().isWritable()) {
            ctx.channel().writeAndFlush("open session message".getBytes()).;
        }
    }
}

// At the time of loading the applicaiton
public void init() {
    final Bootstrap bootStrap = new Bootstrap();
    bootStrap.group(group).channel(NioSocketChannel.class).remoteAddress(hostname, port);
    fixedPool = new FixedChannelPool(bootStrap, getChannelHandler(), 5);

    // This is done to intialise connection and the channelActive() from above handler is invoked to keep the session open on startup
    for (int i = 0; i < config.getMaxConnections(); i++) {
        fixedPool.acquire().addListener(new FutureListener<Channel>() {
            @Override
            public void operationComplete(Future<Channel> future) throws Exception {
                if (future.isSuccess()) {

                } else {
                    LOGGER.error(" Channel initialzation failed...>>", future.cause());
                }

            }
        });
    }
}

// Для фактической отправки сообщения приложением вызывается следующий метод.

public void sendMessage(final String businessMessage) {
    fixedPool.acquire().addListener(new FutureListener<Channel>() {
        @Override
        public void operationComplete(Future<Channel> future) throws Exception {
            if (future.isSuccess()) {
                Channel channel = future.get();
                if (channel.isOpen() && channel.isActive() && channel.isWritable()) {
                    channel.writeAndFlush(businessMessage).addListener(new GenericFutureListener<ChannelFuture>() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess()) {
                                // success msg
                            } else {
                                // failure msg
                            }
                        }
                    });
                    fixedPool.release(channel);
                }
            } else {
                // Failure
            }
        }
    });
}

1 Ответ

0 голосов
/ 24 ноября 2018

Если нет конкретной причины, по которой вам нужно использовать FixedChannelPool, вы можете использовать другую структуру данных (Список / Карта) для хранения каналов.Вы можете добавить канал в структуру данных после отправки сообщения об открытом сеансе и удалить его методом channelInactive.

Если вам нужно выполнить массовые операции с каналами, вы можете использовать ChannelGroup для этой цели.

Если вы все еще хотите использовать FixedChannelPool, вы можете установить в канале атрибут о том, было ли отправлено открытое сообщение:

ctx.channel().attr(OPEN_MESSAGE_SENT).set(true);

, вы можете получить атрибут какследует в вашей sendMessage функции:

boolean sent = ctx.channel().attr(OPEN_MESSAGE_SENT).get();

, а в channelInactive вы можете установить то же самое в false или удалить его.

Примечание OPEN_MESSAGE_SENT - это AttributeKey:

public static final AttributeKey<Boolean> OPEN_MESSAGE_SENT = AttributeKey.valueOf("OPEN_MESSAGE_SENT");
...