Я пытаюсь создать простой MQTT-сервер.Пока он не должен быть полностью совместимым со стандартом, так что терпите меня.Проблема в том, что независимо от того, что я делаю, сервер не принимает запросы TCP.Фактически, он открывает порт и отвечает на TCP SYN, но немедленно отправляет RST после рукопожатия, тем самым закрывая соединение.
То, что я пробовал, вызывает serverBootstrap.bind(...).sync().channel().closeFuture().sync()
, как предлагается здесь: Netty Connection notработает
Я ожидал бы, что будет вызван channelRead()
, но это не так.
Я довольно новичок в Нетти и, похоже, мне не удалось понять некоторые из его основных концепций,Мы будем очень благодарны за толчок в правильном направлении!
serverBootstrap.bind(config.getHost(), config.getPort())
.addListener((ChannelFuture future) -> {
if (future.isSuccess()) {
LOG.info("Bridge bound to {}", future.channel().localAddress());
}
});
private void initializeServerBootstrap() {
serverBootstrap
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<ServerSocketChannel>() {
@Override
protected void initChannel(ServerSocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new MqttDecoder());
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
BridgeConnection connection = createConnection(channel);
serverConnections.put(channel.remoteAddress(), connection);
pipeline.addLast("bridgeInbound", new BridgeInboundHandler(connection));
}
});
}
public class BridgeInboundHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(BridgeInboundHandler.class);
private BridgeConnection connection;
BridgeInboundHandler(BridgeConnection connection) {
this.connection = connection;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
MqttMessage message = NettyUtils.validateMessage(msg);
LOG.info("Channel to {} has read message: {}", ctx.channel().remoteAddress(), message.toString());
connection.handleMessage(message);
} catch (IOException ex) {
LOG.error("Caught exception while trying to parse message.", ex);
ctx.channel().close().addListener((ChannelFuture future) -> {
LOG.error("Closed channel to {} due to exception.", future.channel().remoteAddress());
});
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
LOG.info("Channel with remote address {} has become active.", ctx.channel().remoteAddress());
connection.connect();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
LOG.info("Channel to {} has become inactive.", ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.error("Unexpected channel exception.", cause);
ctx.close().addListener(future -> {
LOG.error("Channel was closed due to exception.");
});
}
}
connection.connect()
, по сути, вызывает channel.writeAndFlush()
, передавая сообщение MQTT CONNECT.