Базовый сервер Netty MQTT сбрасывает соединение сразу после подключения (обработчик не вызывается) - PullRequest
0 голосов
/ 10 апреля 2019

Я пытаюсь создать простой MQTT-сервер.Пока он не должен быть полностью совместимым со стандартом, так что терпите меня.Проблема в том, что независимо от того, что я делаю, сервер не принимает запросы TCP.Фактически, он открывает порт и отвечает на TCP SYN, но немедленно отправляет RST после рукопожатия, тем самым закрывая соединение.

То, что я пробовал, вызывает serverBootstrap.bind(...).sync().channel().closeFuture().sync(), как предлагается здесь: Netty Connection notработает
Я ожидал бы, что будет вызван channelRead(), но это не так.

Я довольно новичок в Нетти и, похоже, мне не удалось понять некоторые из его основных концепций,Мы будем очень благодарны за толчок в правильном направлении!

serverBootstrap.bind(config.getHost(), config.getPort())
    .addListener((ChannelFuture future) -> {
        if (future.isSuccess()) {
            LOG.info("Bridge bound to {}", future.channel().localAddress());
        }
    });

private void initializeServerBootstrap() {
    serverBootstrap
        .group(new NioEventLoopGroup(), new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.SO_REUSEADDR, true)
        .option(ChannelOption.SO_BACKLOG, 128)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childHandler(new ChannelInitializer<ServerSocketChannel>() {
            @Override
            protected void initChannel(ServerSocketChannel channel) {
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast("decoder", new MqttDecoder());
                pipeline.addLast("encoder", MqttEncoder.INSTANCE);
                BridgeConnection connection = createConnection(channel);
                serverConnections.put(channel.remoteAddress(), connection);
                pipeline.addLast("bridgeInbound", new BridgeInboundHandler(connection));
            }
        });
}

public class BridgeInboundHandler extends ChannelInboundHandlerAdapter {

    private static final Logger LOG = LoggerFactory.getLogger(BridgeInboundHandler.class);

    private BridgeConnection connection;

    BridgeInboundHandler(BridgeConnection connection) {
        this.connection = connection;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            MqttMessage message = NettyUtils.validateMessage(msg);
            LOG.info("Channel to {} has read message: {}", ctx.channel().remoteAddress(), message.toString());
            connection.handleMessage(message);
        } catch (IOException ex) {
            LOG.error("Caught exception while trying to parse message.", ex);
            ctx.channel().close().addListener((ChannelFuture future) -> {
                LOG.error("Closed channel to {} due to exception.", future.channel().remoteAddress());
            });
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        LOG.info("Channel with remote address {} has become active.", ctx.channel().remoteAddress());
        connection.connect();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        LOG.info("Channel to {} has become inactive.", ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        LOG.error("Unexpected channel exception.", cause);
        ctx.close().addListener(future -> {
            LOG.error("Channel was closed due to exception.");
        });
    }

}

connection.connect(), по сути, вызывает channel.writeAndFlush(), передавая сообщение MQTT CONNECT.

...