Нетти-соединение создает исключение StackOverflowException - PullRequest
0 голосов
/ 15 января 2019

Недавно в нашей производственной среде возникли проблемы, из-за которых наше программное обеспечение перестало работать должным образом. Как вы можете видеть ниже, в трассировке стека отсутствует наш код, и это всего лишь рекурсивный вызов в Netty. Когда это происходит, кажется, что все будущие соединения будут запрещены, и ЦП будет загружен на несколько ядер. (Не все, но некоторые), что странно, потому что, поскольку больше нет трафика, так как все просто отключено, что на земле обрабатывается?

Как упоминалось ранее, это происходит только в нашей производственной среде. Мы видим, что эта проблема возникает чаще в пиковые дни, когда у нас больше трафика, хотя мы запускаем несколько экземпляров с сотнями подключений каждые 24 часа в сутки, и мы все еще видим эту проблему только 2 раза в неделю, поэтому, к сожалению, пытаемся получить информацию о эта проблема болезненный процесс. У нас также есть основания полагать, что при плохой связи между нашими серверами вероятность этой проблемы возрастает.

Раньше я мало работал с Netty, и, поскольку большая часть этого не мой код, я немного не знаю, где вообще искать, поэтому чувствовал, что поиск помощи будет лучшим вариантом.

Вот код в нашем инициализаторе канала:

    @Override
protected void initChannel(SocketChannel ch) {
    //Final handler in the pipeline. Deals with the objects once and hands them off to the rest of the code
    MessageHandler handler = new MessageHandler(client);
    //Converts the raw bytes into objects that we can deal with
    CodecsHandler codecs = new CodecsHandler(client, ProtocolType.HANDSHAKE.getProtocol());
    //Splits byte streams up into their packets
    FramingHandler framing = new FramingHandler();

    try {
        ch.config().setOption(ChannelOption.IP_TOS, 0x18);
    } catch (ChannelException ex) {
        log.warn("Kernel lacks support for IP_TOS");
    }
    ch.config().setAllocator(PooledByteBufAllocator.DEFAULT);

    ch.pipeline()
            .addLast("idle_timeout", new IdleStateHandler(READ_IDLE_TIMEOUT, WRITE_IDLE_TIMEOUT, 0))
            .addLast("framing", framing)
            //The Noop handler does nothing (These parts of the pipeline are placed later)
            .addLast("compression", NoopHandler.INSTANCE)
            .addLast("codecs", codecs)
            .addLast("handler", handler);
}

Вот наш класс обработчика сообщений:

import com.flowpowered.network.Message;
import com.flowpowered.network.session.Session;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import mycode.Client;

import java.util.concurrent.atomic.AtomicReference;

@Slf4j
public final class MessageHandler extends SimpleChannelInboundHandler<Message> {

    private final AtomicReference<Session> session = new AtomicReference<>(null);
    private final Client client;

    public MessageHandler(Client client) {
        this.client = client;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        Channel c = ctx.channel();
        //Sends some packets packet back to the server and sets the channel object in the client to the channel object above.
        //The returned object is an extended class of https://github.com/OverCaste/flow-networking/blob/master/src/main/java/com/flowpowered/networking/session/Session.java
        Session s = client.newSession(c);
        if (!session.compareAndSet(null, s)) {
            throw new IllegalStateException("Session may not be set more than once");
        }
        s.onReady();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        Session session = this.session.get();
        if(session != null) {
            session.onDisconnect();
        } else {
            log.warn("Child session was null so could not disconnect");
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message i) {
        //Passes the message off to our session object
        session.get().messageReceived(i);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        session.get().onInboundThrowable(cause);
    }

    public AtomicReference<Session> getSession() {
        return session;
    }
}

Вот наш обработчик кодеков

import com.flowpowered.network.Codec;
import com.flowpowered.network.Message;
import com.flowpowered.network.util.ByteBufUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import mycode.Client;
import mycode.CustomProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

@Slf4j
public final class CodecsHandler extends MessageToMessageCodec<ByteBuf, Message> {

    private final CustomProtocol protocol;
    private final Client client;

    public CodecsHandler(Client client, CustomProtocol protocol) {
        this.protocol = protocol;
        this.client = client;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
        // find codec
        Class<? extends Message> clazz = msg.getClass();
        //Codec registration is handled by https://github.com/OverCaste/flow-networking/blob/master/src/main/java/com/flowpowered/networking/Codec.java
        Codec.CodecRegistration reg = protocol.getCodecRegistration(clazz);
        if (reg == null) {
            throw new EncoderException("Unknown message type: " + clazz);
        }

        // write header
        ByteBuf headerBuf = ctx.alloc().buffer(8);
        ByteBufUtils.writeVarInt(headerBuf, reg.getOpcode());

        // write body
        ByteBuf messageBuf = ctx.alloc().buffer();
        messageBuf = reg.getCodec().encode(messageBuf, msg);

        out.add(Unpooled.wrappedBuffer(headerBuf, messageBuf));
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        // find codec and read header
        //https://github.com/OverCaste/flow-networking/blob/master/src/main/java/com/flowpowered/networking/Codec.java
        Codec<?> codec = protocol.newReadHeader(msg);

        // read body
        Message decoded = codec.decode(msg);

        if (msg.readableBytes() > 0) {
            log.warn("Leftover bytes ({}) after decoding: {}", msg.readableBytes(), decoded);
        }

        out.add(decoded);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("Codec throwable caught", cause);

        //We do some other stuff around other parts of the code here
    }
}

Вот наш FramingHandler

import com.flowpowered.network.util.ByteBufUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;

import java.util.List;

public final class FramingHandler extends ByteToMessageCodec<ByteBuf> {

    private static boolean readableVarInt(ByteBuf buf) {
        if (buf.readableBytes() > 5) {
            // maximum varint size
            return true;
        }

        int idx = buf.readerIndex();
        byte in;
        do {
            if (buf.readableBytes() < 1) {
                buf.readerIndex(idx);
                return false;
            }
            in = buf.readByte();
        } while ((in & 0x80) != 0);

        buf.readerIndex(idx);
        return true;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) {
        ByteBufUtils.writeVarInt(out, msg.readableBytes());
        out.writeBytes(msg);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // check for length field readability
        in.markReaderIndex();
        if (!readableVarInt(in)) {
            return;
        }

        // check for contents readability
        int length = ByteBufUtils.readVarInt(in);
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }

        // read contents into buf
        ByteBuf buf = ctx.alloc().buffer(length);
        in.readBytes(buf, length);
        out.add(buf);
    }
}

Вот обработчик сжатия

import com.flowpowered.network.util.ByteBufUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageCodec;

import java.util.List;
import java.util.zip.Deflater;
import java.util.zip.Inflater;

public final class CompressionHandler extends MessageToMessageCodec<ByteBuf, ByteBuf> {

    private static final int MAX_INFLATED_BYTES = 1_000_000;
    private static final int COMPRESSION_LEVEL = Deflater.DEFAULT_COMPRESSION;

    private final int threshold;
    private final Inflater inflater;
    private final Deflater deflater;

    public CompressionHandler(int threshold) {
        this.threshold = threshold;
        inflater = new Inflater();
        deflater = new Deflater(COMPRESSION_LEVEL);
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
        ByteBuf prefixBuf = ctx.alloc().buffer(5);
        ByteBuf contentsBuf;
        try {
            if (msg.readableBytes() >= threshold){
                // message should be compressed
                int index = msg.readerIndex();
                int length = msg.readableBytes();

                byte[] sourceData = new byte[length];
                msg.readBytes(sourceData);
                deflater.setInput(sourceData);
                deflater.finish();

                ByteBuf result = msg.alloc().buffer(length);
                byte[] byteArray = new byte[8192];
                int totalBytes = 0;
                while (!deflater.finished()){
                    int compressedLength = deflater.deflate(byteArray);
                    result.writeBytes(byteArray, 0, compressedLength);
                    totalBytes += compressedLength;
                }
                deflater.reset();

                if (totalBytes == 0){
                    // compression failed in some weird way
                    throw new EncoderException("Failed to compress message of size " + length);
                } else if (totalBytes >= length){
                    // compression increased the size. threshold is probably too low
                    // send as an uncompressed packet
                    result.release();
                    ByteBufUtils.writeVarInt(prefixBuf, 0);
                    msg.readerIndex(index);
                    msg.retain();
                    contentsBuf = msg;
                } else {
                    // all is well
                    ByteBufUtils.writeVarInt(prefixBuf, length);
                    contentsBuf = result;
                }
            } else {
                // message should be sent through
                ByteBufUtils.writeVarInt(prefixBuf, 0);
                msg.retain();
                contentsBuf = msg;
            }
        } catch (Exception e){
            prefixBuf.release();
            throw e;
        }

        out.add(Unpooled.wrappedBuffer(prefixBuf, contentsBuf));
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        int index = msg.readerIndex();
        int uncompressedSize = ByteBufUtils.readVarInt(msg);
        if (uncompressedSize == 0) {
            // message is uncompressed
            int length = msg.readableBytes();
            if (length >= threshold) {
                // invalid
                throw new DecoderException("Received uncompressed message of size " + length + " greater than threshold " + threshold);
            }

            msg.retain();
            out.add(msg);
        } else {
            if (uncompressedSize > MAX_INFLATED_BYTES)
                //Don't trust this - this is a very big and may come with malicious intent
                throw new DecoderException("Resulting uncompressed size is too large for us to handle safely");
            // message is compressed
            byte[] sourceData = new byte[msg.readableBytes()];
            msg.readBytes(sourceData);
            inflater.setInput(sourceData);

            byte[] destData = new byte[8192];
            ByteBuf result = msg.alloc().buffer(uncompressedSize);
            int totalBytes = 0;
            while (!inflater.finished()){
                int resultLength = inflater.inflate(destData);
                result.writeBytes(destData, 0, resultLength);
                totalBytes += resultLength;
                if (totalBytes > uncompressedSize)
                    throw new DecoderException("Received compressed message claiming to be of size " + uncompressedSize + " but actually larger");
            }
            inflater.reset();

            if (totalBytes == 0) {
                // might be a leftover from before compression was enabled (no compression header)
                // uncompressedSize is likely to be < threshold
                result.release();
                msg.readerIndex(index);
                msg.retain();
                out.add(msg);
            } else if (totalBytes != uncompressedSize) {
                throw new DecoderException("Received compressed message claiming to be of size " + uncompressedSize + " but actually " + totalBytes);
            } else {
                out.add(result);
            }
        }
    }
}

Вот наш NoopHandler (Буквально он просто оставлен как заполнитель для частей конвейера, которые еще ничего не делают)

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;

@ChannelHandler.Sharable
public class NoopHandler extends ChannelHandlerAdapter {

    public static final NoopHandler INSTANCE = new NoopHandler();

    private NoopHandler() {}
}

Из-за масштаба проекта и того факта, что мне не разрешено выпускать весь код, я удалил много ненужного кода и переработанных имен классов.


https://pastebin.com/RAp1qcxb

java.lang.StackOverflowError: null
at org.apache.logging.slf4j.Log4jLogger.log(Log4jLogger.java:371) ~[log4j-slf4j-impl-2.11.1.jar:2.11.1]
at io.netty.util.internal.logging.LocationAwareSlf4JLogger.log(LocationAwareSlf4JLogger.java:42) ~[netty-common-4.1.31.Final.jar:4.1.31.Final]
at io.netty.util.internal.logging.LocationAwareSlf4JLogger.warn(LocationAwareSlf4JLogger.java:198) ~[netty-common-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:294) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:856) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:778) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
... ... ...

1 Ответ

0 голосов
/ 15 января 2019

Вот след того, что происходит, в конечном итоге кажется, что findContextOutbound() в итоге возвращает тот же экземпляр AbstractChannelHandlerContext, который вызвал flush() с самого начала, и повторяется с вызовом flush() снова. Вам, вероятно, потребуется отладить и пройтись по этим методам или исследовать установку, происходящую с ChannelHandlerContext s.

Cycle:
    [repeat from 1] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
    [6] at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
    [5] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
    [4] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
    [3] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
    [2] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:770)
    [1] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)

[1] AbstractChannelHandlerContext.flush():

      final AbstractChannelHandlerContext next = findContextOutbound(); // see following method
      EventExecutor executor = next.executor();
      if (executor.inEventLoop()){
          next.invokeFlush() // -> [2]
      } else { /* threadded invokeFlush */ }

    // do you happen to have two AbstractChannelHandlerContexts ctx1 and ctx2, such that?:
    //     ctx1.prev == ctx2 && ctx2.prev == ctx1;
    AbstractChannelHandlerContext.findContextOutbound():

      AbstractChannelHandlerContext ctx = this;
      do {
          ctx = ctx.prev;
      } while (!ctx.outbound);
      return ctx;

[2] AbstractChannelHandlerContext.invokeFlush():

      // this time invokeHandler() returns `false`
      if (invokeHandler()){
          invokeFlush0();
      }  else {
          flush(); // -> [3]
      }

    /**
     * Makes best possible effort to detect if `ChannelHandler.handlerAdded(ChannelHandlerContext)`
     * was called yet. If not return `false` and if called or could not detect return `true`.
     *
     * If this method returns `false` we will not invoke the `ChannelHandler` but just forward the event.
     * This is needed as `DefaultChannelPipeline` may already put the `ChannelHandler` in the linked-list
     * but not called `ChannelHandler.handlerAdded(ChannelHandlerContext)`.
     */
    AbstractChannelHandlerContext.invokeHandler():

      int handlerState = this.handlerState;
      return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);

[3] AbstractChannelHandlerContext.flush():

      final AbstractChannelHandlerContext next = findContextOutbound();
      EventExecutor executor = next.executor();
      if (executor.inEventLoop())
         next.invokeFlush() // -> [4]
      } else { /* threadded invokeFlush */ }

[4] AbstractChannelHandlerContext.invokeFlush():

      // this time invokeHandler() returns `true`
      if (invokeHandler()){
          invokeFlush0(); // -> [5]
      }  else {
         flush();
      }

[5] AbstractChannelHandlerContext.invokeFlush0():

      try {
          ((ChannelOutboundHandler) handler()).flush(this); // -> [6]
      } catch (Throwable t) {
          notifyHandlerException(t); // ultimately reaches this when `StackOverflowException`
      }

// ChannelDuplexHandler implements ChannelOutboundHandler
[6] ChannelDuplexHandler.flush(ChannelHandlerContext ctx):

      // ctx is the original `AbstractChannelHandlerContext` and the cycle repeats
      ctx.flush(); // -> repeat from [1]
...