Неблокирующий сокет Java , представленный в Java 2 Standard Edition 1.4, обеспечивает сетевое взаимодействие между приложениями без блокировки процессов с использованием сокетов. Но что такое неблокирующий сокет, в каких контекстах он может быть полезен и как он работает?
Что такое неблокирующая розетка?
Неблокирующая розетка позволяет осуществлять операции ввода-вывода на канале, не блокируя процессы, использующие его. Это означает, что мы можем использовать один поток для обработки нескольких одновременных соединений и получить «асинхронные высокопроизводительные» операции чтения / записи (некоторые люди могут не согласиться с этим)
Хорошо, в каких случаях это может быть полезно?
Предположим, вы хотите внедрить сервер, принимающий различные клиентские соединения. Предположим также, что вы хотите, чтобы сервер мог обрабатывать несколько запросов одновременно. Используя традиционный способ, у вас есть два варианта разработки такого сервера:
- Реализация многопоточного сервера, который вручную обрабатывает поток для каждого соединения.
- Использование внешнего стороннего модуля.
Оба решения работают, но, приняв первое, вы должны разработать целое решение для управления потоками с сопутствующими параллелизмом и конфликтными проблемами. Второе решение делает приложение зависимым от внешнего модуля, отличного от JDK, и, возможно, вам придется адаптировать библиотеку к вашим потребностям. С помощью неблокирующего сокета вы можете реализовать неблокирующий сервер без непосредственного управления потоками или обращения к внешним модулям.
Как это работает?
Прежде чем вдаваться в подробности, есть несколько терминов, которые вам необходимо понять:
- В реализациях на основе NIO вместо записи данных в выходные потоки и чтения данных из входных потоков мы читаем и записываем данные из буферов . буфер может быть определен как временное хранилище.
- Канал передает большую часть данных в буферы . Также его можно рассматривать как конечную точку для общения.
- Выбор готовности - это понятие, которое обозначает «возможность выбора сокета, который не будет блокироваться при чтении или записи данных».
Java NIO имеет класс Selector
, который позволяет одному потоку проверять события ввода-вывода на нескольких каналах. Как это возможно? Ну, selector
может проверить «готовность» канала к событиям, таким как клиент пытается подключиться, или операция чтения / записи. Это означает, что каждый экземпляр Selector
может отслеживать больше каналов сокетов и, следовательно, больше соединений. Теперь, когда что-то происходит на канале (происходит событие), selector
информирует приложение для обработки запроса . selector
делает это путем создания ключей событий (или клавиш выбора), которые являются экземплярами класса SelectionKey
. Каждый key
содержит информацию о , который делает запрос и , какой тип запроса , как показано на рисунке 1.
Рисунок 1: Структурная схема
Базовая реализация
Серверная реализация состоит из бесконечного цикла, в котором selector
ожидает события и создает ключи события. Существует четыре возможных типа ключа:
- Допустимо: связанный клиент запрашивает соединение.
- Подключаемо: сервер принял соединение.
- Доступно для чтения: сервер может читать.
- Доступно для записи: сервер может писать.
Обычно acceptable
ключи создаются на стороне сервера. Фактически, этот тип ключа просто сообщает серверу, что клиенту требуется соединение, затем сервер индивидуализирует канал сокета и связывает его с селектором для операций чтения / записи. После этого, когда принятый клиент читает или пишет что-то, селектор создаст для этого клиента ключи readable
или writeable
.
Теперь вы готовы написать сервер на Java, следуя предложенному алгоритму. Создание канала сокета, selector
и регистрация переключателя сокета могут быть выполнены следующим образом:
final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;
// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));
// This is how you open a Selector
selector = Selector.open();
/*
* Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
* This means that you just told your selector that this channel will be used to accept connections.
* We can change this operation later to read/write, more on this later.
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
Сначала мы создаем экземпляр SocketChannel
методом ServerSocketChannel.open()
. Затем, configureBlocking(false)
вызов устанавливает этот channel
как неблокирующий . Подключение к серверу осуществляется методом serverChannel.socket().bind()
. HOSTNAME
представляет IP-адрес сервера, а PORT
- порт связи. Наконец, вызовите метод Selector.open()
для создания экземпляра selector
и зарегистрируйте его для channel
и типа регистрации. В этом примере тип регистрации - OP_ACCEPT
, что означает, что селектор просто сообщает, что клиент пытается подключиться к серверу. Другие возможные варианты: OP_CONNECT
, который будет использоваться клиентом; OP_READ
; и OP_WRITE
.
Теперь нам нужно обработать эти запросы, используя бесконечный цикл. Простой способ заключается в следующем:
// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
/*
* selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
* For example, if a client connects right this second, then it will break from the select()
* call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
* block undefinable.
*/
selector.select(TIMEOUT);
/*
* If we are here, it is because an operation happened (or the TIMEOUT expired).
* We need to get the SelectionKeys from the selector to see what operations are available.
* We use an iterator for this.
*/
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
// remove the key so that we don't process this OPERATION again.
keys.remove();
// key could be invalid if for example, the client closed the connection.
if (!key.isValid()) {
continue;
}
/*
* In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
* If the key from the keyset is Acceptable, then we must get ready to accept the client
* connection and do something with it. Go read the comments in the accept method.
*/
if (key.isAcceptable()) {
System.out.println("Accepting connection");
accept(key);
}
/*
* If you already read the comments in the accept() method, then you know we changed
* the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
* a channel that is writable (key.isWritable()). The write() method will explain further.
*/
if (key.isWritable()) {
System.out.println("Writing...");
write(key);
}
/*
* If you already read the comments in the write method then you understand that we registered
* the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
* that is ready to read (key.isReadable()). The read() method will explain further.
*/
if (key.isReadable()) {
System.out.println("Reading connection");
read(key);
}
}
}
Вы можете найти источник реализации здесь
ПРИМЕЧАНИЕ. Асинхронный сервер
В качестве альтернативы неблокирующей реализации мы можем развернуть асинхронный сервер. Например, вы можете использовать класс AsynchronousServerSocketChannel
, который обеспечивает асинхронный канал для потоковых сокетов прослушивания.
Чтобы использовать его, сначала выполните его статический метод open()
, а затем bind()
для определенного порта . Затем вы выполните его метод accept()
, передав ему класс, который реализует интерфейс CompletionHandler
. Чаще всего этот обработчик создается как анонимный внутренний класс .
Из этого AsynchronousServerSocketChannel
объекта вы вызываете accept()
, чтобы сказать ему, чтобы он начал прослушивать соединения, передавая ему пользовательский экземпляр CompletionHandler
. Когда мы вызываем accept()
, он немедленно возвращается. Обратите внимание, что это отличается от традиционного подхода блокировки; тогда как метод accept()
блокируется до тех пор, пока клиент не подключится к нему , метод AsynchronousServerSocketChannel
accept()
обрабатывает его для вас.
Вот вам пример:
public class NioSocketServer
{
public NioSocketServer()
{
try {
// Create an AsynchronousServerSocketChannel that will listen on port 5000
final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
.open()
.bind(new InetSocketAddress(5000));
// Listen for a new request
listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
{
@Override
public void completed(AsynchronousSocketChannel ch, Void att)
{
// Accept the next connection
listener.accept(null, this);
// Greet the client
ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));
// Allocate a byte buffer (4K) to read from the client
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
try {
// Read the first line
int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
boolean running = true;
while (bytesRead != -1 && running) {
System.out.println("bytes read: " + bytesRead);
// Make sure that we have data to read
if (byteBuffer.position() > 2) {
// Make the buffer ready to read
byteBuffer.flip();
// Convert the buffer into a line
byte[] lineBytes = new byte[bytesRead];
byteBuffer.get(lineBytes, 0, bytesRead);
String line = new String(lineBytes);
// Debug
System.out.println("Message: " + line);
// Echo back to the caller
ch.write(ByteBuffer.wrap(line.getBytes()));
// Make the buffer ready to write
byteBuffer.clear();
// Read the next line
bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
} else {
// An empty line signifies the end of the conversation in our protocol
running = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
// The user exceeded the 20 second timeout, so close the connection
ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
System.out.println("Connection timed out, closing connection");
}
System.out.println("End of conversation");
try {
// Close the connection if we need to
if (ch.isOpen()) {
ch.close();
}
} catch (I/OException e1)
{
e1.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void att)
{
///...
}
});
} catch (I/OException e) {
e.printStackTrace();
}
}
public static void main(String[] args)
{
NioSocketServer server = new NioSocketServer();
try {
Thread.sleep(60000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Вы можете найти полный код здесь