Я создал обработчик входящих событий типа SimpleChannelInboundHandler и добавил его в конвейер.Мое намерение - каждый раз, когда устанавливается соединение, я хотел отправить сообщение приложения, называемое сообщением об открытии сеанса, и подготовить соединение для отправки фактического сообщения.Чтобы достичь этого, вышеуказанный обработчик входящих вызовов переходит к channelActive (), на который отправляется сообщение об открытии сеанса. В ответ на это я получаю сообщение с подтверждением открытия сеанса.Только после этого я смогу отправить любое количество реальных деловых сообщений.Я использую FixedChannelPool и инициализируется следующим образом.Это хорошо работает некоторое время при запуске.Но если удаленный хост закрывает соединение, после этого, если сообщение отправляется с использованием нижеуказанной функции sendMessage (), сообщение отправляется даже до сообщения об открытии сеанса через channelActive (), и его ответ получается.Таким образом, сервер игнорирует сообщение, так как сеанс еще не открыт, когда бизнес-сообщение было отправлено.
Я ищу, что пул должен возвращать только тот канал, который вызвал событие channelActive (), которое ужеотправил сообщение об открытии сеанса и получил от сервера подтверждение подтверждения открытия сеанса.Как справиться с этой ситуацией?
public class SessionHandler extends SimpleChannelInboundHandler<byte[]> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
if (ctx.channel().isWritable()) {
ctx.channel().writeAndFlush("open session message".getBytes()).;
}
}
}
// At the time of loading the applicaiton
public void init() {
final Bootstrap bootStrap = new Bootstrap();
bootStrap.group(group).channel(NioSocketChannel.class).remoteAddress(hostname, port);
fixedPool = new FixedChannelPool(bootStrap, getChannelHandler(), 5);
// This is done to intialise connection and the channelActive() from above handler is invoked to keep the session open on startup
for (int i = 0; i < config.getMaxConnections(); i++) {
fixedPool.acquire().addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
} else {
LOGGER.error(" Channel initialzation failed...>>", future.cause());
}
}
});
}
}
// Для фактической отправки сообщения приложением вызывается следующий метод.
public void sendMessage(final String businessMessage) {
fixedPool.acquire().addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
Channel channel = future.get();
if (channel.isOpen() && channel.isActive() && channel.isWritable()) {
channel.writeAndFlush(businessMessage).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// success msg
} else {
// failure msg
}
}
});
fixedPool.release(channel);
}
} else {
// Failure
}
}
});
}