Неблокирующие розетки - PullRequest
       3

Неблокирующие розетки

11 голосов
/ 09 октября 2010

Каков наилучший способ реализации неблокирующего сокета в Java?

Или есть такая вещь?У меня есть программа, которая связывается с сервером через сокет, но я не хочу, чтобы вызов сокета блокировал / вызывал задержку, если есть проблема с данными / соединением.

Ответы [ 5 ]

13 голосов
/ 21 июня 2017

Неблокирующий сокет Java , представленный в Java 2 Standard Edition 1.4, обеспечивает сетевое взаимодействие между приложениями без блокировки процессов с использованием сокетов. Но что такое неблокирующий сокет, в каких контекстах он может быть полезен и как он работает?

Что такое неблокирующая розетка?

Неблокирующая розетка позволяет осуществлять операции ввода-вывода на канале, не блокируя процессы, использующие его. Это означает, что мы можем использовать один поток для обработки нескольких одновременных соединений и получить «асинхронные высокопроизводительные» операции чтения / записи (некоторые люди могут не согласиться с этим)

Хорошо, в каких случаях это может быть полезно?

Предположим, вы хотите внедрить сервер, принимающий различные клиентские соединения. Предположим также, что вы хотите, чтобы сервер мог обрабатывать несколько запросов одновременно. Используя традиционный способ, у вас есть два варианта разработки такого сервера:

  • Реализация многопоточного сервера, который вручную обрабатывает поток для каждого соединения.
  • Использование внешнего стороннего модуля.

Оба решения работают, но, приняв первое, вы должны разработать целое решение для управления потоками с сопутствующими параллелизмом и конфликтными проблемами. Второе решение делает приложение зависимым от внешнего модуля, отличного от JDK, и, возможно, вам придется адаптировать библиотеку к вашим потребностям. С помощью неблокирующего сокета вы можете реализовать неблокирующий сервер без непосредственного управления потоками или обращения к внешним модулям.

Как это работает?

Прежде чем вдаваться в подробности, есть несколько терминов, которые вам необходимо понять:

  • В реализациях на основе NIO вместо записи данных в выходные потоки и чтения данных из входных потоков мы читаем и записываем данные из буферов . буфер может быть определен как временное хранилище.
  • Канал передает большую часть данных в буферы . Также его можно рассматривать как конечную точку для общения.
  • Выбор готовности - это понятие, которое обозначает «возможность выбора сокета, который не будет блокироваться при чтении или записи данных».

Java NIO имеет класс Selector, который позволяет одному потоку проверять события ввода-вывода на нескольких каналах. Как это возможно? Ну, selector может проверить «готовность» канала к событиям, таким как клиент пытается подключиться, или операция чтения / записи. Это означает, что каждый экземпляр Selector может отслеживать больше каналов сокетов и, следовательно, больше соединений. Теперь, когда что-то происходит на канале (происходит событие), selector информирует приложение для обработки запроса . selector делает это путем создания ключей событий (или клавиш выбора), которые являются экземплярами класса SelectionKey. Каждый key содержит информацию о , который делает запрос и , какой тип запроса , как показано на рисунке 1.

Figure 1: Structure diagram Рисунок 1: Структурная схема

Базовая реализация

Серверная реализация состоит из бесконечного цикла, в котором selector ожидает события и создает ключи события. Существует четыре возможных типа ключа:

  • Допустимо: связанный клиент запрашивает соединение.
  • Подключаемо: сервер принял соединение.
  • Доступно для чтения: сервер может читать.
  • Доступно для записи: сервер может писать.

Обычно acceptable ключи создаются на стороне сервера. Фактически, этот тип ключа просто сообщает серверу, что клиенту требуется соединение, затем сервер индивидуализирует канал сокета и связывает его с селектором для операций чтения / записи. После этого, когда принятый клиент читает или пишет что-то, селектор создаст для этого клиента ключи readable или writeable.

Теперь вы готовы написать сервер на Java, следуя предложенному алгоритму. Создание канала сокета, selector и регистрация переключателя сокета могут быть выполнены следующим образом:

final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;

// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));

// This is how you open a Selector
selector = Selector.open();
/*
 * Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
 * This means that you just told your selector that this channel will be used to accept connections.
 * We can change this operation later to read/write, more on this later.
 */
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

Сначала мы создаем экземпляр SocketChannel методом ServerSocketChannel.open(). Затем, configureBlocking(false) вызов устанавливает этот channel как неблокирующий . Подключение к серверу осуществляется методом serverChannel.socket().bind(). HOSTNAME представляет IP-адрес сервера, а PORT - порт связи. Наконец, вызовите метод Selector.open() для создания экземпляра selector и зарегистрируйте его для channel и типа регистрации. В этом примере тип регистрации - OP_ACCEPT, что означает, что селектор просто сообщает, что клиент пытается подключиться к серверу. Другие возможные варианты: OP_CONNECT, который будет использоваться клиентом; OP_READ; и OP_WRITE.

Теперь нам нужно обработать эти запросы, используя бесконечный цикл. Простой способ заключается в следующем:

// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
    /*
     * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
     * For example, if a client connects right this second, then it will break from the select()
     * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
     * block undefinable.
     */
    selector.select(TIMEOUT);

    /*
     * If we are here, it is because an operation happened (or the TIMEOUT expired).
     * We need to get the SelectionKeys from the selector to see what operations are available.
     * We use an iterator for this.
     */
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

    while (keys.hasNext()) {
        SelectionKey key = keys.next();
        // remove the key so that we don't process this OPERATION again.
        keys.remove();

        // key could be invalid if for example, the client closed the connection.
        if (!key.isValid()) {
            continue;
        }
        /*
         * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
         * If the key from the keyset is Acceptable, then we must get ready to accept the client
         * connection and do something with it. Go read the comments in the accept method.
         */
        if (key.isAcceptable()) {
            System.out.println("Accepting connection");
            accept(key);
        }
        /*
         * If you already read the comments in the accept() method, then you know we changed
         * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
         * a channel that is writable (key.isWritable()). The write() method will explain further.
         */
        if (key.isWritable()) {
            System.out.println("Writing...");
            write(key);
        }
        /*
         * If you already read the comments in the write method then you understand that we registered
         * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
         * that is ready to read (key.isReadable()). The read() method will explain further.
         */
        if (key.isReadable()) {
            System.out.println("Reading connection");
            read(key);
        }
    }
}

Вы можете найти источник реализации здесь

ПРИМЕЧАНИЕ. Асинхронный сервер

В качестве альтернативы неблокирующей реализации мы можем развернуть асинхронный сервер. Например, вы можете использовать класс AsynchronousServerSocketChannel, который обеспечивает асинхронный канал для потоковых сокетов прослушивания.

Чтобы использовать его, сначала выполните его статический метод open(), а затем bind() для определенного порта . Затем вы выполните его метод accept(), передав ему класс, который реализует интерфейс CompletionHandler. Чаще всего этот обработчик создается как анонимный внутренний класс .

Из этого AsynchronousServerSocketChannel объекта вы вызываете accept(), чтобы сказать ему, чтобы он начал прослушивать соединения, передавая ему пользовательский экземпляр CompletionHandler. Когда мы вызываем accept(), он немедленно возвращается. Обратите внимание, что это отличается от традиционного подхода блокировки; тогда как метод accept() блокируется до тех пор, пока клиент не подключится к нему , метод AsynchronousServerSocketChannel accept() обрабатывает его для вас.

Вот вам пример:

public class NioSocketServer
{
    public NioSocketServer()
    {
        try {
            // Create an AsynchronousServerSocketChannel that will listen on port 5000
            final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
                    .open()
                    .bind(new InetSocketAddress(5000));

            // Listen for a new request
            listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
            {
                @Override
                public void completed(AsynchronousSocketChannel ch, Void att)
                {
                    // Accept the next connection
                    listener.accept(null, this);

                    // Greet the client
                    ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));

                    // Allocate a byte buffer (4K) to read from the client
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
                    try {
                        // Read the first line
                        int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);

                        boolean running = true;
                        while (bytesRead != -1 && running) {
                            System.out.println("bytes read: " + bytesRead);

                            // Make sure that we have data to read
                            if (byteBuffer.position() > 2) {
                                // Make the buffer ready to read
                                byteBuffer.flip();

                                // Convert the buffer into a line
                                byte[] lineBytes = new byte[bytesRead];
                                byteBuffer.get(lineBytes, 0, bytesRead);
                                String line = new String(lineBytes);

                                // Debug
                                System.out.println("Message: " + line);

                                // Echo back to the caller
                                ch.write(ByteBuffer.wrap(line.getBytes()));

                                // Make the buffer ready to write
                                byteBuffer.clear();

                                // Read the next line
                                bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
                            } else {
                                // An empty line signifies the end of the conversation in our protocol
                                running = false;
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        // The user exceeded the 20 second timeout, so close the connection
                        ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
                        System.out.println("Connection timed out, closing connection");
                    }

                    System.out.println("End of conversation");
                    try {
                        // Close the connection if we need to
                        if (ch.isOpen()) {
                            ch.close();
                        }
                    } catch (I/OException e1)
                    {
                        e1.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, Void att)
                {
                    ///...
                }
            });
        } catch (I/OException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args)
    {
        NioSocketServer server = new NioSocketServer();
        try {
            Thread.sleep(60000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Вы можете найти полный код здесь

6 голосов
/ 10 октября 2010

Каков наилучший способ реализации неблокирующего сокета в Java?

Есть только один способ. SocketChannel.configureBlocking(false).

Обратите внимание, что некоторые из этих ответов неверны. SocketChannel.configureBlocking (false) переводит его в неблокирующий режим. Вам не нужно Selector, чтобы сделать это. Вам нужен только Selector для реализации тайм-аутов или мультиплексированный ввод / вывод с неблокирующими сокетами.

3 голосов
/ 09 октября 2010

Помимо использования неблокирующего ввода-вывода, вы можете обнаружить, что гораздо проще создать поток записи для вашего соединения.

Примечание: если вам нужно всего несколько тысяч соединений, один-два потока на соединение проще. Если у вас есть около десяти тысяч или более соединений на сервер, вам нужен NIO с селекторами.

0 голосов
/ 28 июня 2013

Я только что написал этот код. Это работает хорошо . Это пример Java NIO, как упомянуто в ответах выше, но здесь я публикую код.

ServerSocketChannel ssc = null;
try {
    ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress(port));
    ssc.configureBlocking(false);
    while (true) {
        SocketChannel sc = ssc.accept();
        if (sc == null) {
            // No connections came .
        } else {
            // You got a connection. Do something
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}
0 голосов
/ 09 октября 2010

Пакет java.nio предоставляет Селектор , работающий так же, как в C.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...