У меня трудности при работе с 2 хостами в netty клиенте. Я пытаюсь установить соединение между двумя хостами как одним основным и резервным. Если мое основное соединение отключается при отправке сообщения через TCP, отправьте сообщение на резервные хосты. Например;
У меня 2 хозяина. Один из них - 10.50.60.30, а другой - 10.50.60.57. Мой главный хост - 10.50.60.30, а резервная копия - 10.50.60.57. Когда я обнаруживаю проблему с 60.30, я хочу перейти на 60.57. Но он все еще пытается отправить на сломанный основной хост.
Полагаю, проблема возникает в FalconClientHandler.class при инициализации статической переменной ctx в функции channelActive (). Должен ли я использовать список ChannelHandlerContext, в котором хранятся все соединения. Между прочим, никаких исключений не происходит. Это о логике
FalconClient.java
public static boolean connect(Host host) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap clientBootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 50)
.remoteAddress(new InetSocketAddress(host.getIp(), host.getPort()))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
//socketChannel.pipeline().addLast("idleStateHandler", new IdleStateHandler(250, 0, 0, TimeUnit.MILLISECONDS));
socketChannel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2146));
socketChannel.pipeline().addLast(new FalconClientHandler(host));
}
});
ChannelFuture channelFuture = clientBootstrap.connect().sync(); //BAŞARI İLE BAĞLANDI
channelFuture.channel().closeFuture().sync();
return host.isActive();
} catch (Exception e) {
LOGGER.info("Connection timed out --> " + e);
host.setActive(false);
return false;
} finally {
//group.shutdownGracefully().sync();
host.setActive(false);
}
}
Функция отправителя сообщения
Я могу найти сбитые серверы внутри FalconClientHandler. Так что я могу изменить хост в этой функции.
private void sendMessageToServer() throws Exception {
if (!LB.getUpHostList().isEmpty()) {
TCPRequestMessage message = jobQueue.take();
host = LB.getUpHostList().get(0);
if (host.isActive()) {
String rbtran = message.getRBTRAN();
LOGGER.debug(host.getIp() + " - GÖNDERME KUYRUĞUNDAN ÇIKARTILDI -> " + rbtran);
FalconClientHandler.sendMessage(rbtran);
} else {
jobQueue.put(message);
}
}
}
FalconClientHandler
public class FalconClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private Host host;
private static ChannelHandlerContext ctx;
public FalconClientHandler(Host host) {
this.host = host;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getLocalizedMessage());
host.setActive(false);
ctx.close();
//do more exception handling
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("Connected!" + ctx.channel().remoteAddress());
ctx.fireChannelActive();
this.ctx = ctx;
}
public synchronized static void sendMessage(String message) {
System.err.println(ctx.channel().isActive());
System.err.println(ctx.channel().remoteAddress());
//if connection is broken, switch to another ctx
byte[] byteBuffer = message.getBytes();
ctx.channel().writeAndFlush(Unpooled.copiedBuffer(byteBuffer));
LOGGER.debug("MESAJ FALCONA GÖNDERİLDİ " + message);
}
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws InterruptedException {
String input = in.toString(CharsetUtil.UTF_8);
LOGGER.debug(host.getIp() + " MESAJ -> " + input);
if (input.length() > 0 && !input.equalsIgnoreCase(FALCONSTART) && !input.equalsIgnoreCase(FALCONSTOP)) {
if (input.contains(FALCONSTART))
input = input.replace(FALCONSTART, "");
if (input.contains(FALCONREADY))
input = input.replace(FALCONREADY, "");
int headerLength = Integer.parseInt(input.substring(8, 12));
String transactionKey = input.substring(52, 52 + headerLength).trim();
FalconClient.addResponseMessageToMap(transactionKey, input);
FalconClient.requestQueue.take();
}
switch (input) {
case FALCONSTART: //FALCON BAŞLADI 100000051
addStatusHistoryToStack(FALCONSTART, this.host.getStatusHistory()); //Bir önceki durumu kaydet.
this.host.setActive(true);
break;
case FALCONSTOP: //FALCON DURDU 100000050
addStatusHistoryToStack(FALCONSTOP, this.host.getStatusHistory());
this.host.setActive(false);
break;
}
}
}
Спасибо