Несколько серверов в Netty - PullRequest
       11

Несколько серверов в Netty

0 голосов
/ 12 января 2019

У меня трудности при работе с 2 хостами в netty клиенте. Я пытаюсь установить соединение между двумя хостами как одним основным и резервным. Если мое основное соединение отключается при отправке сообщения через TCP, отправьте сообщение на резервные хосты. Например;

У меня 2 хозяина. Один из них - 10.50.60.30, а другой - 10.50.60.57. Мой главный хост - 10.50.60.30, а резервная копия - 10.50.60.57. Когда я обнаруживаю проблему с 60.30, я хочу перейти на 60.57. Но он все еще пытается отправить на сломанный основной хост.

Полагаю, проблема возникает в FalconClientHandler.class при инициализации статической переменной ctx в функции channelActive (). Должен ли я использовать список ChannelHandlerContext, в котором хранятся все соединения. Между прочим, никаких исключений не происходит. Это о логике

FalconClient.java

public static boolean connect(Host host) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap clientBootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 50)
                    .remoteAddress(new InetSocketAddress(host.getIp(), host.getPort()))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) {
                            //socketChannel.pipeline().addLast("idleStateHandler", new IdleStateHandler(250, 0, 0, TimeUnit.MILLISECONDS));
                            socketChannel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2146));
                            socketChannel.pipeline().addLast(new FalconClientHandler(host));
                        }
                    });

            ChannelFuture channelFuture = clientBootstrap.connect().sync(); //BAŞARI İLE BAĞLANDI

            channelFuture.channel().closeFuture().sync();

            return host.isActive();
        } catch (Exception e) {
            LOGGER.info("Connection timed out --> " + e);
            host.setActive(false);
            return false;
        } finally {
            //group.shutdownGracefully().sync();
            host.setActive(false);
        }
    }

Функция отправителя сообщения

Я могу найти сбитые серверы внутри FalconClientHandler. Так что я могу изменить хост в этой функции.

private void sendMessageToServer() throws Exception {
        if (!LB.getUpHostList().isEmpty()) {
            TCPRequestMessage message = jobQueue.take();
            host = LB.getUpHostList().get(0);
            if (host.isActive()) {
                String rbtran = message.getRBTRAN();

                LOGGER.debug(host.getIp() + " - GÖNDERME KUYRUĞUNDAN ÇIKARTILDI -> " + rbtran);

                FalconClientHandler.sendMessage(rbtran);
            } else {
                jobQueue.put(message);
            }
        }
    }

FalconClientHandler

public class FalconClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private Host host;
    private static ChannelHandlerContext ctx;

    public FalconClientHandler(Host host) {
        this.host = host;
    }

@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.getLocalizedMessage());
        host.setActive(false);
        ctx.close();
        //do more exception handling
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("Connected!" + ctx.channel().remoteAddress());
        ctx.fireChannelActive();
        this.ctx = ctx;
    }

    public synchronized static void sendMessage(String message) {
        System.err.println(ctx.channel().isActive());
        System.err.println(ctx.channel().remoteAddress());
        //if connection is broken, switch to another ctx
        byte[] byteBuffer = message.getBytes();
        ctx.channel().writeAndFlush(Unpooled.copiedBuffer(byteBuffer));
        LOGGER.debug("MESAJ FALCONA GÖNDERİLDİ " + message);
    }

@Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws InterruptedException {
        String input = in.toString(CharsetUtil.UTF_8);
        LOGGER.debug(host.getIp() + " MESAJ -> " + input);
        if (input.length() > 0 && !input.equalsIgnoreCase(FALCONSTART) && !input.equalsIgnoreCase(FALCONSTOP)) {
            if (input.contains(FALCONSTART))
                input = input.replace(FALCONSTART, "");
            if (input.contains(FALCONREADY))
                input = input.replace(FALCONREADY, "");

            int headerLength = Integer.parseInt(input.substring(8, 12));
            String transactionKey = input.substring(52, 52 + headerLength).trim();
            FalconClient.addResponseMessageToMap(transactionKey, input);
            FalconClient.requestQueue.take();
        }

        switch (input) {
            case FALCONSTART: //FALCON BAŞLADI 100000051
                addStatusHistoryToStack(FALCONSTART, this.host.getStatusHistory()); //Bir önceki durumu kaydet.
                this.host.setActive(true);

                break;
            case FALCONSTOP: //FALCON DURDU 100000050
                addStatusHistoryToStack(FALCONSTOP, this.host.getStatusHistory());
                this.host.setActive(false);

                break;
        }
    }
}

Спасибо

...