Как использовать несколько ядер в API неблокирующего ввода-вывода (NIO) JAVA? - PullRequest
2 голосов
/ 25 июня 2019

JAVA NIO предоставляет API для написания TCP-сервера с использованием архитектуры NIO следующим образом.

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.ParseException;
import java.util.*;

public class NIOServer implements Runnable{
    private InetAddress addr;
    private int port;
    private Selector selector;

    public NIOServer(InetAddress addr, int port) throws IOException {
        this.addr = addr;
        this.port = port;
    }

    public void run(){
        try {
            startServer();
        }catch(IOException ex){
            System.out.println(ex.getMessage());
        }
    }

    private void startServer() throws IOException {

        this.selector = Selector.open();
        ServerSocketChannel serverChannel = serverSocketChannel.open();
        serverChannel.configureBlocking(false);
        InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
        serverChannel.socket().bind(listenAddr);
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);


        while (true) {

            this.selector.select();


            Iterator keys = this.selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();


                keys.remove();

                if (! key.isValid()) {
                    continue;
                }

                if (key.isAcceptable()) {
                    this.accept(key);
                }
                else if (key.isReadable()) {
                    this.read(key);
                }
                else if (key.isWritable()) {
                    this.write(key);
                }
            }
        }
    }
}

При этом используется один поток, который будет обрабатывать такие события, как чтение, запись и принятие.

По сравнению с блокирующим потоком для каждой архитектуры подключения это намного предпочтительнее из-за его неблокирующей природы, которая приводит к минимальным потерям кэша, издержкам потока, низкому количеству миграций процессора.

Однако эта архитектура использует только один поток.В многопроцессорной среде (например, 4-ядерный процессор) архитектура NIO тратит впустую другие ядра.Есть ли подход к проектированию, который я могу использовать, чтобы использовать все ядра с архитектурой NIO?

NIO2 (который основан на шаблоне proactor) является одним из таких вариантов.Но базовая архитектура сильно отличается от оригинальной NIO.

1 Ответ

5 голосов
/ 25 июня 2019

Основная идея этого состоит в том, чтобы разбить задачу :

    ExecuterService workers = Executors.newFixedThreadPool(50);

    ....
    while (true) {

                this.selector.select();

                Iterator keys = this.selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    SelectionKey key = (SelectionKey) keys.next();

                    keys.remove();

                    if (! key.isValid()) {
                        continue;
                    }

                    if (key.isAcceptable()) {
                        this.accept(key);
                    }
                    else if (key.isReadable()) {
                        workers.execute(new ReadTaskHandler(key));
                    }
                    else if (key.isWritable()) {
                        workers.execute(new WriteTaskHandler(key));
                    }
                }
            }
class ReadTaskHandler implements Runnable {
    SelectionKey key;

    public ReadTaskHandler(SelectionKey key) {
        this.key = key;
    }

    @Override
    public void run() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        SocketChannel channel = (SocketChannel) key.channel();

        int size = 0;
        try {
            while ((size = channel.read(buffer)) > 0) {
                System.out.println(new String(buffer.array()));
                buffer.flip();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

На самом деле, есть много моделей про NIO. Например, мы также можем использовать несколько потоков для обработки задачи принятия (также называемой модель с несколькими реакторами или модель с несколькими событиями ).

Кстати, Netty - это отличная управляемая событиями инфраструктура сетевых приложений, упакованная в Java NIO

...