Смешивание NIO с IO - PullRequest
       25

Смешивание NIO с IO

5 голосов
/ 01 апреля 2010

Обычно у вас есть один связанный порт TCP и несколько подключений к ним. По крайней мере, обычно есть больше соединений в качестве связанных портов. У меня другой случай: я хочу связать много портов и обычно не иметь (или, по крайней мере, очень мало) соединений.

Поэтому я хочу использовать NIO для приема входящих соединений.

Однако мне нужно передать принятые соединения в существующую библиотеку jsch ssh. Для этого требуются сокеты IO вместо сокетов NIO, он порождает один (или два) поток (-ы) на соединение. Но это нормально для меня.

Теперь я подумал, что следующие строки дадут тот же результат:

Socket a = serverSocketChannel.accept().socket();
Socket b = serverSocketChannel.socket().accept();
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking( true );
Socket c = channel.socket();
Socket d = serverSocket.accept();

Однако функции getInputStream() и getOutputStream() возвращаемых сокетов, похоже, работают по-разному. Только если сокет был принят с использованием последнего вызова, jsch может работать с ним. В первых трех случаях это терпит неудачу (и я извиняюсь: я не знаю почему).

Так есть ли способ преобразовать такой сокет?

Ответы [ 3 ]

3 голосов
/ 01 апреля 2010

Входные и выходные потоки, возвращаемые сокетами, полученными из SocketChannels, внутренне синхронизируются на канале в некоторых точках. Поэтому вы не можете использовать их для полнодуплексных протоколов, таких как SSH, потому что система заблокируется. То же самое относится к потокам, преобразованным из каналов через класс Channels (что и составляет первый случай).

0 голосов
/ 12 мая 2018

Я должен был сделать это, потому что мы использовали JSch и имели тысячи потоков, прослушивающих порты. Я оставлю это на ваше усмотрение обрабатывать принятие соединений, но это работает с JSch из потока, запущенного для обработки соединения:

/**
 * Handler that allows for input and output streams of an NIO socket to be
 * read as blocking I/O
 * <p>
 * Reading and writing from the streams <b>must</b> be done on the same thread
 * @author Eric
 *
 */
public class NioBlockingSocketStreamer
{

    private static long NIO_SELECT_TIMEOUT = 0;
    private static long NIO_WRITE_TIMEOUT_SECONDS = 30;
    /** poison pill result that signals a disconnected channel */
    private static int NIO_READ_IOEXCEPTION = -2;

    private final SocketChannel socketChannel;

    private Selector selector;

    private final InputStream inputStream = new MyInputStream(this);
    private final OutputStream outputStream = new MyOutputStream(this);

    private final LinkedBlockingQueue<ByteBuffer> pendingWrites;
    private final MyReadQueue<ReadRequest> pendingRead;

    private volatile boolean disconnected = false;

    public NioBlockingSocketStreamer(SocketChannel socketChannel,
                                     int writeQueueSize)
    {
        pendingRead = new MyReadQueue<ReadRequest>( );
        pendingWrites = new LinkedBlockingQueue<ByteBuffer>(writeQueueSize);
        this.socketChannel = socketChannel;
    }

    public NioBlockingSocketStreamer start( )
            throws IOException
    {
        this.selector = Selector.open( );
        this.socketChannel.configureBlocking(false);

        final NioBlockingSocketStreamer streamer = this;
        Thread t = new Thread(new Runnable( )
        {
            @Override
            public void run( )
            {
                try
                {
                    streamer.nioServiceLoop( );
                }
                catch (ClosedChannelException e)
                {
                    //shutting down
                }
                catch (ClosedSelectorException e)
                {
                    //shutting down
                }
                catch (IOException e)
                {
                    e.printStackTrace( );
                }
            }
        });
        t.start( );
        return this;
    }

    public void nioServiceLoop( )
            throws IOException
    {
        ByteBuffer writeBuffer = null;
        ReadRequest readRequest = null;

        try
        {
            for (;;)
            {
                if (readRequest == null)
                    readRequest = pendingRead.poll( );
                int interestOps = readRequest != null
                        ? SelectionKey.OP_READ
                        : 0;
                if (writeBuffer == null && pendingWrites.isEmpty( ))
                    interestOps &= ~SelectionKey.OP_WRITE;
                else
                    interestOps |= SelectionKey.OP_WRITE;
                socketChannel.register(selector, interestOps);

                int n = selector.select(NIO_SELECT_TIMEOUT);
                if (n > 0)
                {
                    Set<SelectionKey> selectedKeys = selector.selectedKeys( );
                    for (SelectionKey selectedKey : selectedKeys)
                    {
                        if (selectedKey.isValid( ))
                        {
                            if (selectedKey.isReadable( ))
                            {
                                int cbRead;
                                try
                                {
                                    cbRead = socketChannel.read(readRequest.buffer);
                                }
                                catch (IOException e)
                                {
                                    //"java.io.IOException: An existing connection was forcibly closed by the remote host"
                                    //is the typical disconnection
                                    return;
                                }
                                interestOps &= ~SelectionKey.OP_READ;
                                readRequest.submitResult(cbRead);
                                readRequest = null;
                            }

                            if (selectedKey.isWritable( ))
                            {
                                if (writeBuffer == null)
                                    writeBuffer = pendingWrites.poll( );

                                if (writeBuffer != null)
                                {
                                    try
                                    {
                                        socketChannel.write(writeBuffer);
                                    }
                                    catch (IOException e)
                                    {
                                        //"java.io.IOException: An existing connection was forcibly closed by the remote host"
                                        //is the typical disconnection
                                        return;
                                    }
                                    if (!writeBuffer.hasRemaining( ))
                                        writeBuffer = null;
                                }
                            }
                        }
                    }
                    selectedKeys.clear( );
                }
            }
        }
        finally
        {
            disconnected = true;

            //submit poison pill result for outstanding read request
            if (readRequest != null)
            {
                readRequest.submitResult(NIO_READ_IOEXCEPTION);
            }

            //since the read request queue became empty before we disconnected, check for the next
            //read request that may have queued in the meantime
            readRequest = pendingRead.poll( );
            if (readRequest != null)
            {
                readRequest.submitResult(NIO_READ_IOEXCEPTION);
            }
        }
    }

    private ReadRequest readRequest = new ReadRequest( );

    public synchronized int read(ByteBuffer buffer)
            throws IOException
    {
        if (disconnected)
            throw new IOException("disconnected");

        readRequest.buffer = buffer;
        int result;
        try
        {
            pendingRead.offer(readRequest);
            selector.wakeup( );
            result = readRequest.getResult( );
        }
        catch (InterruptedException e)
        {
            throw new IOException(e);
        }

        if (result == NIO_READ_IOEXCEPTION)
            throw new IOException("disconnected");

        return result;
    }

    public void write(ByteBuffer buffer)
            throws IOException
    {
        if (disconnected)
            throw new IOException("disconnected");

        boolean queued = false;
        if (pendingWrites.remainingCapacity( ) > 0)
        {
            try
            {
                queued = pendingWrites.add(buffer);
            }
            catch (IllegalStateException e)
            {
                throw new IOException("write queue capacity exceeded");
            }
        }
        else
        {
            try
            {
                queued = pendingWrites.offer(buffer, NIO_WRITE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
            }
            catch (InterruptedException e)
            {
                throw new IOException(e);
            }
        }

        if (!queued)
            throw new IOException("unable to queue write");

        if (selector != null) //selector should never be null
        {
            selector.wakeup( );
        }
    }

    public InputStream getInputStream( )
    {
        return inputStream;
    }

    public OutputStream getOutputStream( )
    {
        return outputStream;
    }

    /**
     * Implements a simple blocking mechanism for getting the results of a read
     * <p>
     * <b>Note</b> This class must avoid autoboxing due to Retroweaver
     */
    static class ReadRequest
    {
        ByteBuffer buffer;
        LinkedBlockingQueue<Integer> result = new LinkedBlockingQueue<Integer>(1);

        void submitResult(int cb)
        {
            result.add(new Integer(cb));
        }

        int getResult( )
                throws InterruptedException
        {
            return (int) result.take( );
        }
    }

    static class MyInputStream
        extends InputStream
    {

        private final NioBlockingSocketStreamer streamer;

        MyInputStream(NioBlockingSocketStreamer streamer)
        {
            this.streamer = streamer;
        }

        @Override
        public int read(byte[] b,
                        int off,
                        int len)
                throws IOException
        {
            ByteBuffer buffer = ByteBuffer.allocate(len);
            int n = streamer.read(buffer);
            if (n > 0)
                System.arraycopy(buffer.array( ), 0, b, off, n);
            return n;
        }

        @Override
        public int read(byte[] b)
                throws IOException
        {
            return read(b, 0, b.length);
        }

        @Override
        public int read( )
                throws IOException
        {
            return read(new byte[1], 0, 1);
        }

        @Override
        public void close( )
                throws IOException
        {
            // TODO Auto-generated method stub
        }

    }

    static class MyOutputStream
        extends OutputStream
    {

        private final NioBlockingSocketStreamer streamer;

        MyOutputStream(NioBlockingSocketStreamer streamer)
        {
            this.streamer = streamer;
        }

        @Override
        public void write(byte[] b,
                          int off,
                          int len)
                throws IOException
        {
            ByteBuffer buffer = ByteBuffer.allocate(len);
            buffer.put(b, off, len);
            buffer.flip( );
            streamer.write(buffer);
        }

        @Override
        public void write(byte[] b)
                throws IOException
        {
            write(b, 0, b.length);
        }

        @Override
        public void write(int b)
                throws IOException
        {
            write(new byte[1], 0, 1);
        }

        @Override
        public void close( )
                throws IOException
        {
            // TODO Auto-generated method stub
        }

        @Override
        public void flush( )
                throws IOException
        {
            // TODO Auto-generated method stub
        }
    }

    static class MyReadQueue<T>
    {
        //FIXME using LinkedBlockingQueue is overkill
        private final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(1);

        public T poll( )
        {
            return queue.poll( );
        }

        public void offer(T request)
                throws InterruptedException
        {
            queue.offer(request, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        }
    }

}
0 голосов
/ 01 апреля 2010

Эта последовательность работает в нашей производственной среде:

final SocketAddress serverAddr =
    new InetSocketAddress(
        bind_address,
        server_port
    );

final ServerSocketChannel serverChannel = ServerSocketChannel.open( );

serverChannel.socket( ).bind(
    serverAddr,
    backlog
);

final SocketChannel socketChannel = serverChannel.accept( );

final Socket socket = socketChannel.socket( );

final OutputStream out = socket.getOutputStream( );

final InputStream in = socket.getInputStream( );

По умолчанию все каналы будут в режиме blocking.

Помните, что я не показывал код очистки / обработки исключений.

Также есть некоторые ошибки , связанные с NIO и потоками сокетов

...