Проблемы связи между двумя приложениями сокетов Netty (частичные сообщения) от связи на основе сокетов - PullRequest
1 голос
/ 07 июня 2019

У меня возникают проблемы со связью при тестировании между двумя приложениями сокетов Netty (основным приложением и приложением тестирования интеграции), когда я получаю необычное количество частичных сообщений.

Замечен один паттерн: сообщение first , отправляемое из тестового приложения (приложение выполняет отправку сообщения извне конвейера с использованием обработчика с совместным доступом), имеет тенденцию всегда быть частичным Это также замечается во время задержки, когда возникает другая проблема.

Другая проблема заключается в том, что, когда частичное сообщение время от времени принимается, декодер кажется замкнутым в цикле, где он продолжает пытаться прочитать частичное сообщение бесконечно. У меня есть модульный тест для имитации частичного сообщения с использованием EmbeddedChannel, но модульный тест не повторяет то, что я вижу во время интеграционного теста.

Основное приложение использует следующий конвейер:

ch.pipeline().addLast(<HeaderTrailerFrameDecoder>, <NettyMessageDecoder>, <HeaderTrailerFrameEncoder>, <NettyMessageEncoder>);
ch.pipeline().addLast(<IdleStateHandler>,<EventHandler>, <ChannelHandler>);

где:

  • HeaderTrailerFrameDecoder - Удаляет однобайтовый кадр из начала / конца пакета
  • NettyMessageDecoder - Преобразует сообщение из ByteBuf в объект домена
  • HeaderTrailerFrameEncoder - добавляет кадр к пакетам сообщений
  • NettyMessageEncoder - Преобразует сообщение из объекта домена в ByteBuf
  • IdleStateHandler - класс Netty, обнаруживает устаревшие / незанятые соединения
  • EventHandler - Sharable-обработчик для отправки сообщений извне конвейера
  • ChannelHandler - главный обработчик всей бизнес-логики

Я думаю, что проблема может быть со всеми кодировщиками / декодерами, и, возможно, не освобождение моих объектов ByteBuf? Я вижу проблему только с первым декодером, HeaderTrailerFrameDecoder, поэтому ниже приведу фрагмент кода. Для каждого соединения первое сообщение, поступающее из серии сообщений, отправляемых тестовым приложением, сначала создает журнал msg="could not find trailer".

@Slf4j // Lombok logging
public class HeaderTrailerFrameDecoder extends ByteToMessageDecoder {

    private final byte header;
    private final byte trailer;

    HeaderTrailerFrameDecoder(byte header, byte trailer) {
        this.header = header;
        this.trailer = trailer;
    }

    @Override
    protected void decode(
            final ChannelHandlerContext ctx,
            final ByteBuf buf,
            final List<Object> out) {
        log.trace("msg=\"decoding message with header and trailer\", buf={}", buf);

        // Find header
        int headerIndex = buf.forEachByte(value -> value != header);
        if (headerIndex < 0) {
            log.error("msg=\"could not find header\", payload=\"{}\"", payload);
            buf.skipBytes(buf.readableBytes());
            return;
        }
        int beforeHeaderLen = headerIndex - buf.readerIndex();
        buf.skipBytes(beforeHeaderLen + 1);


        // Find trailer
        int trailerIndex = buf.forEachByte(value -> value != trailer);
        if (trailerIndex < 0) {
            String payload = debug(buf);
            log.error("msg=\"could not find trailer\"");
            buf.resetReaderIndex();
            return;
        }
        int insideFrameLen = trailerIndex - buf.readerIndex();
        ByteBuf frame = buf.readBytes(insideFrameLen);
        buf.skipBytes(1);

        // Pass message through
        out.add(frame);
    }
}
...