Как сделать так, чтобы TCP-сервер на базе Java NIO (Non-blocking IO) использовал Disruptor? - PullRequest
0 голосов
/ 24 апреля 2019

Я пытаюсь реализовать TCP-сервер на основе JAVA NIO с помощью Disruptor.

Java NIO работает неблокирующим образом. Все новые соединения сначала попадают в сокет ServerAccept. Затем, используя ключ (который возвращается из метода selector.select ()), соответствующий обработчик (если ключ приемлем, создается новый канал сокета, и канал регистрируется селектором, если ключ читаемый, содержимое читается из канала и затем регистрируется для записи, и, если ключ доступен для записи, канал записывается так, как должен иметься ответ). Самый простой сервер на основе NIO работает в одном потоке (все обработчики и селектор в одном потоке).

Java Disruptor - это высокопроизводительная реализация Ring, которую можно использовать для передачи сообщений между различными компонентами (потоками).

Мои вопросы следующие.

  1. Можем ли мы использовать несколько потоков для дизайна NIO?

  2. Можем ли мы запускать eventHandlers в отдельных потоках?

  3. Если мы можем запустить eventHandlers в отдельных потоках, как мы можем передать selectionCeys и каналы между потоками?

  4. Можно ли использовать библиотеку java Disruptor для передачи данных между основным потоком (в котором работает селектор) и потоками EventHandler?

  5. Если это возможно, каков подход к проектированию? (Как работают EventProducer, EventConsumer и RingBuffer в Disruptor?)

1 Ответ

0 голосов
/ 16 мая 2019

Вы можете создать сервер на базе NIO, используя любой метод передачи сообщений потока, где прерыватель является одним из таких вариантов.

Там проблема, которую вам нужно решить, - это как разделить работу с другим потоком (необрабатывать запрос в самом главном потоке).

Следовательно, вы можете передать буфер, полученный из соединения с сокетом, в отдельный поток, используя прерыватель в качестве метода обмена сообщениями.Кроме того, вам нужно поддерживать общую параллельную хэш-карту, чтобы сообщить основному потоку (который запускает цикл событий), готов ли ответ или нет.Ниже приведен пример.

HttpEvent.java

import java.nio.ByteBuffer;

public class HttpEvent
{
    private ByteBuffer buffer;
    private String requestId;
    private int numRead;


    public ByteBuffer getBuffer() {
        return buffer;
    }

    public void setBuffer(ByteBuffer buffer) {
        this.buffer = buffer;
    }

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public int getNumRead() {
        return numRead;
    }

    public void setNumRead(int numRead) {
        this.numRead = numRead;
    }
}

HttpEventFactory.java

import com.lmax.disruptor.EventFactory;

public class HttpEventFactory implements EventFactory<HttpEvent>
{
    public HttpEvent newInstance()
    {
        return new HttpEvent();
    }
}

HttpEventHandler.java

import com.lmax.disruptor.EventHandler;

import java.nio.ByteBuffer;
import java.util.Dictionary;
import java.util.concurrent.ConcurrentHashMap;

public class HttpEventHandler implements EventHandler<HttpEvent>
{
    private int id;
    private ConcurrentHashMap concurrentHashMap;

    public HttpEventHandler(int id, ConcurrentHashMap concurrentHashMap){
        this.id = id;
        this.concurrentHashMap = concurrentHashMap;

    }

    public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws Exception
    {
        if( sequence % Runtime.getRuntime().availableProcessors()==id){


            String requestId = event.getRequestId();
            ByteBuffer buffer = event.getBuffer();
            int numRead= event.getNumRead();

            ByteBuffer responseBuffer = handleRequest(buffer, numRead);


            this.concurrentHashMap.put(requestId, responseBuffer);

        }
    }

    private ByteBuffer handleRequest(ByteBuffer buffer, int numRead) throws Exception {

        buffer.flip();
        byte[] data = new byte[numRead];
        System.arraycopy(buffer.array(), 0, data, 0, numRead);
        String request = new String(data, "US-ASCII");
        request = request.split("\n")[0].trim();


        String response = serverRequest(request);

        buffer.clear();

        buffer.put(response.getBytes());
        return  buffer;
    }

    private String serverRequest(String request) throws Exception {
        String response = "Sample Response";
        if (request.startsWith("GET")) {

            // http request parsing and response generation should be done here.    


        return  response;
    }
}

HttpEventMain.java

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.lang3.RandomStringUtils;

import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class HttpEventMain
{
    private InetAddress addr;
    private int port;
    private Selector selector;
    private HttpEventProducer producer ;
    private ConcurrentHashMap concurrentHashMapResponse;
    private ConcurrentHashMap concurrentHashMapKey;

    public HttpEventMain(InetAddress addr, int port) throws IOException {
        this.setAddr(addr);
        this.setPort(port);
        this.setConcurrentHashMapResponse(new ConcurrentHashMap<>());
        this.concurrentHashMapKey = new ConcurrentHashMap<>();
    }


    public static void main(String[] args) throws Exception
    {
        System.out.println("----- Running the server on machine with "+Runtime.getRuntime().availableProcessors()+" cores -----");

        HttpEventMain server = new HttpEventMain(null, 4333);



        HttpEventFactory factory = new HttpEventFactory();


        int bufferSize = 1024;


        Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks


        Disruptor<HttpEvent> disruptor = new Disruptor<HttpEvent>(factory, bufferSize, executor);

        HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];

        for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
            handlers[i] = new HttpEventHandler(i, server.getConcurrentHashMapResponse());
        }


        disruptor.handleEventsWith(handlers);




        disruptor.start();


        RingBuffer<HttpEvent> ringBuffer = disruptor.getRingBuffer();

        server.setProducer(new HttpEventProducer(ringBuffer, server.getConcurrentHashMapResponse()));

        try {
            System.out.println("\n====================Server Details====================");
            System.out.println("Server Machine: "+ InetAddress.getLocalHost().getCanonicalHostName());
            System.out.println("Port number: " + server.getPort());

        } catch (UnknownHostException e1) {
            e1.printStackTrace();
        }

        try {

            server.start();

        } catch (IOException e) {
            System.err.println("Error occured in HttpEventMain:" + e.getMessage());
            System.exit(0);
        }



    }
    private void start() throws IOException {
        this.selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);


        InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
        serverChannel.socket().bind(listenAddr);
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server ready. Ctrl-C to stop.");

        while (true) {

            this.selector.select();


            Iterator keys = this.selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();

                keys.remove();

                if (! key.isValid()) {
                    continue;
                }

                if (key.isAcceptable()) {
                    this.accept(key);
                }
                else if (key.isReadable()) {
                    this.read(key);
                }
                else if (key.isWritable()) {
                    this.write(key);
                }
            }
        }

    }

    private void accept(SelectionKey key) throws IOException {

        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverChannel.accept();
        channel.configureBlocking(false);


        Socket socket = channel.socket();
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();

        channel.register(this.selector, SelectionKey.OP_READ);
    }

    private void read(SelectionKey key) throws IOException {

        SocketChannel channel = (SocketChannel) key.channel();

        ByteBuffer buffer = ByteBuffer.allocate(8192);
        int numRead = -1;
        try {
            numRead = channel.read(buffer);
        }
        catch (IOException e) {
            e.printStackTrace();
        }

        if (numRead == -1) {

            Socket socket = channel.socket();
            SocketAddress remoteAddr = socket.getRemoteSocketAddress();
            channel.close();
            key.cancel();
            return;

        }
        String requestID = RandomStringUtils.random(15, true, true);

        while(concurrentHashMapKey.containsValue(requestID) || concurrentHashMapResponse.containsKey(requestID)){
            requestID = RandomStringUtils.random(15, true, true);
        }

        concurrentHashMapKey.put(key, requestID);

        this.producer.onData(requestID, buffer, numRead);

        channel.register(this.selector, SelectionKey.OP_WRITE, buffer);
    }

    private boolean responseReady(SelectionKey key){

        String requestId = concurrentHashMapKey.get(key).toString();
        String response = concurrentHashMapResponse.get(requestId).toString();

        if(response!="0"){
            concurrentHashMapKey.remove(key);
            concurrentHashMapResponse.remove(requestId);
            return true;
        }else{
            return false;
        }

    }

    private void write(SelectionKey key) throws IOException {

        if(responseReady(key)) {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer inputBuffer = (ByteBuffer) key.attachment();
            inputBuffer.flip();
            channel.write(inputBuffer);
            channel.close();
            key.cancel();

        }

    }

    public HttpEventProducer getProducer() {
        return producer;
    }

    public void setProducer(HttpEventProducer producer) {
        this.producer = producer;
    }

    public ConcurrentHashMap getConcurrentHashMapResponse() {
        return concurrentHashMapResponse;
    }

    public void setConcurrentHashMapResponse(ConcurrentHashMap concurrentHashMapResponse) {
        this.concurrentHashMapResponse = concurrentHashMapResponse;
    }

    public InetAddress getAddr() {
        return addr;
    }

    public void setAddr(InetAddress addr) {
        this.addr = addr;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public Selector getSelector() {
        return selector;
    }

    public void setSelector(Selector selector) {
        this.selector = selector;
    }
}

HttpEventProducer.java

import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;

public class HttpEventProducer
{
    private final RingBuffer<HttpEvent> ringBuffer;
    private final ConcurrentHashMap concurrentHashMap;

    public HttpEventProducer(RingBuffer<HttpEvent> ringBuffer, ConcurrentHashMap concurrentHashMap)
    {
        this.ringBuffer = ringBuffer;
        this.concurrentHashMap = concurrentHashMap;
    }

    public void onData(String requestId, ByteBuffer buffer, int numRead)
    {
        long sequence = ringBuffer.next();

        try
        {
            HttpEvent event = ringBuffer.get(sequence);
            event.setBuffer(buffer);
            event.setRequestId(requestId);
            event.setNumRead(numRead);
        }
        finally
        {
            concurrentHashMap.put(requestId, "0");
            ringBuffer.publish(sequence);


        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...