Java NIO: allocate или allocateDirect при создании ByteBuffer каждый раз, когда я пишу в SocketChannel - PullRequest
2 голосов
/ 05 июня 2019

У меня есть игровой сервер, который работает каждые 600 миллисекунд, и в течение цикла манипулирует байтовым массивом, а затем в конце цикла записывает байтовый массив клиенту.

Из-за неопределенного характера того, сколько байтов нужно будет записать в конце цикла, я создаю буфер байтов для каждой записи в конце цикла. В чем я не уверен, так это в том случае, если было бы быстрее сделать это прямым или нет.

Из того, что я мог бы извлечь из других подобных вопросов, прямой может быть лучше, поскольку прямой байтовый буфер может быть создан в любом случае, когда буфер записывается в SocketChannel, если он еще не является прямым. Любое дальнейшее разъяснение было бы здорово.

В случае, если мой вопрос был недостаточно ясен, вот код моей сети: https://pastebin.com/M9wm88BA

package com.palidino.nio;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class NioServer implements Runnable {
    private List<Session> sessions = new ArrayList<>();
    private Map<String, Integer> connectionCounts = new HashMap<>();
    private InetSocketAddress hostAddress;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private SessionHandler sessionHandler;
    private boolean running;

    private ByteBuffer readBuffer;
    private byte[] readBytes;

    private int sessionIdleTimeout;
    private int maxConnectionsPerIPAddress;
    private int socketBufferSize = 16384;

    public NioServer() throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
    }

    public void start(String remoteAddress, int port) throws IOException {
        if (hostAddress != null) {
            throw new IllegalStateException("Server already started");
        }
        if (sessionHandler == null) {
            throw new IllegalStateException("SsessionHandler can't be null");
        }
        readBuffer = ByteBuffer.allocateDirect(socketBufferSize);
        readBytes = new byte[socketBufferSize];
        hostAddress = new InetSocketAddress(remoteAddress, port);
        serverSocketChannel.socket().setReceiveBufferSize(socketBufferSize);
        serverSocketChannel.socket().bind(hostAddress);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Starting server on " + remoteAddress + ":" + port);
        new Thread(this, "NioServer").start();
    }

    public void stop() {
        try {
            if (serverSocketChannel != null) {
                serverSocketChannel.close();
                serverSocketChannel = null;
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }

    public void setSessionHandler(SessionHandler sessionHandler) {
        this.sessionHandler = sessionHandler;
    }

    public void setSessionIdleTimeout(int seconds) {
        if (hostAddress != null) {
            throw new IllegalStateException("Server already started");
        }
        if (seconds <= 0) {
            throw new IllegalArgumentException("seconds must be greater than 0");
        }
        sessionIdleTimeout = seconds * 1000;
    }

    public void setMaxConnectionsPerIPAddress(int maxConnectionsPerIPAddress) {
        if (hostAddress != null) {
            throw new IllegalStateException("Server already started");
        }
        if (maxConnectionsPerIPAddress <= 0) {
            throw new IllegalArgumentException("maxConnectionsPerIPAddress must be greater than 0");
        }
        this.maxConnectionsPerIPAddress = maxConnectionsPerIPAddress;
    }

    public void setSocketBufferSize(int socketBufferSize) throws IOException {
        if (hostAddress != null) {
            throw new IllegalStateException("Server already started");
        }
        if (socketBufferSize <= 0) {
            throw new IllegalArgumentException("size must be greater than 0");
        }
        this.socketBufferSize = socketBufferSize;
    }

    @Override
    public void run() {
        if (running) {
            throw new IllegalStateException("Server is already running");
        }
        running = true;
        while (serverSocketChannel.isOpen()) {
            cycle();
        }
        running = false;
    }

    private void cycle() {
        try {
            selector.select();
            for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) {
                SelectionKey selectionKey = it.next();
                it.remove();
                Session session = null;
                try {
                    if (serverSocketChannel == null || !serverSocketChannel.isOpen()) {
                        break;
                    }
                    session = (Session) selectionKey.attachment();
                    if (selectionKey.isValid() && selectionKey.isAcceptable()) {
                        session = accept(selectionKey);
                    }
                    if (session == null) {
                        continue;
                    }
                    if (selectionKey.isValid() && selectionKey.isReadable()) {
                        read(selectionKey);
                    }
                    if (selectionKey.isValid() && selectionKey.isWritable()) {
                        write(selectionKey);
                    }
                } catch (Exception e2) {
                    error(e2, session);
                }
            }
            checkSessions();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Session accept(SelectionKey selectionKey) throws IOException {
        Session session = null;
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.socket().setSendBufferSize(socketBufferSize);
        socketChannel.configureBlocking(false);
        String remoteAddress = socketChannel.socket().getInetAddress().getHostAddress();
        int connectionCount = getConnectionCount(remoteAddress);
        if (maxConnectionsPerIPAddress > 0 && connectionCount >= maxConnectionsPerIPAddress) {
            socketChannel.close();
        } else {
            connectionCounts.put(remoteAddress, connectionCount + 1);
            session = new Session(socketChannel, remoteAddress, socketChannel.register(selector, SelectionKey.OP_READ));
            sessionHandler.accept(session);
            sessions.add(session);
        }
        return session;
    }

    private void read(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (!socketChannel.isOpen()) {
            return;
        }
        Session session = (Session) selectionKey.attachment();
        readBuffer.clear();
        int numberBytesRead;
        ByteArrayOutputStream readStream = new ByteArrayOutputStream();
        while ((numberBytesRead = socketChannel.read(readBuffer)) > 0) {
            readBuffer.flip();
            readBuffer.get(readBytes, 0, numberBytesRead);
            readStream.write(readBytes, 0, numberBytesRead);
            readBuffer.clear();
            session.updateLastRead();
        }
        if (readStream.size() > 0) {
            sessionHandler.read(session, readStream.toByteArray());
        }
        if (numberBytesRead == -1) {
            session.close();
        }
    }

    private void write(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (!socketChannel.isOpen()) {
            return;
        }
        Session session = (Session) selectionKey.attachment();
        if (session.getWriteEvents().isEmpty()) {
            return;
        }
        try {
            while (!session.getWriteEvents().isEmpty()) {
                WriteEvent writeEvent = session.getWriteEvents().peek();
                socketChannel.write(writeEvent.getBuffer());
                if (writeEvent.getBuffer().remaining() > 0) {
                    break;
                }
                if (writeEvent.getHandler() != null) {
                    writeEvent.getHandler().complete(session, true);
                }
                session.getWriteEvents().poll();
            }
        } catch (Exception e) {
            error(e, session);
        }
        if (selectionKey.isValid() && session.getWriteEvents().isEmpty()) {
            selectionKey.interestOps(SelectionKey.OP_READ);
        }
    }

    private void error(Exception exception, Session session) throws IOException {
        try {
            sessionHandler.error(exception, session);
        } catch (Exception e) {
            if (session != null) {
                session.close();
            }
            e.printStackTrace();
        }
    }

    private void checkSessions() {
        if (sessions.isEmpty()) {
            return;
        }
        for (Iterator<Session> it = sessions.iterator(); it.hasNext();) {
            Session session = it.next();
            SelectionKey selectionKey = session.getSelectionKey();
            if (selectionKey.isValid() && !session.getWriteEvents().isEmpty()) {
                selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            }
            if (session.idleTimeout(sessionIdleTimeout)) {
                session.close();
            }
            if (session.isOpen()) {
                continue;
            }
            String remoteAddress = session.getRemoteAddress();
            int connectionCount = getConnectionCount(remoteAddress);
            if (connectionCount > 1) {
                connectionCounts.put(remoteAddress, connectionCount - 1);
            } else {
                connectionCounts.remove(remoteAddress);
            }
            if (sessionHandler != null) {
                sessionHandler.closed(session);
            }
            if (selectionKey.isValid()) {
                selectionKey.cancel();
            }
            while (!session.getWriteEvents().isEmpty()) {
                WriteEvent writeEvent = session.getWriteEvents().poll();
                if (writeEvent.getHandler() != null) {
                    writeEvent.getHandler().complete(session, false);
                }
            }
            it.remove();
        }
    }

    private int getConnectionCount(String remoteAddress) {
        return connectionCounts.containsKey(remoteAddress) ? connectionCounts.get(remoteAddress) : 0;
    }

    public void printStats() {
        System.out
                .println("NIOServer: sessions: " + sessions.size() + "; connectionCounts: " + connectionCounts.size());
    }
}

https://pastebin.com/TxPQN7JZ

package com.palidino.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Session {
    private SocketChannel socketChannel;
    private SelectionKey selectionKey;
    private String remoteAddress;
    private long lastRead;
    private Queue<WriteEvent> writeEvents = new ConcurrentLinkedQueue<>();
    private Object attachment;

    public Session(SocketChannel socketChannel, String remoteAddress, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.remoteAddress = remoteAddress;
        this.selectionKey = selectionKey;
        selectionKey.attach(this);
        updateLastRead();
    }

    public void write(byte[] bytes) {
        write(bytes, 0, bytes.length, null);
    }

    public void write(byte[] bytes, WriteEventHandler handler) {
        write(bytes, 0, bytes.length, handler);
    }

    public void write(byte[] bytes, int offset, int length) {
        write(bytes, 0, bytes.length, null);
    }

    public void write(byte[] bytes, int offset, int length, WriteEventHandler handler) {
        addWriteEvent(new WriteEvent(bytes, offset, length, handler));
    }

    public void write(ByteBuffer buffer) {
        write(buffer, null);
    }

    public void write(ByteBuffer buffer, WriteEventHandler handler) {
        addWriteEvent(new WriteEvent(buffer, handler));
    }

    private void addWriteEvent(WriteEvent writeEvent) {
        writeEvents.offer(writeEvent);
        if (selectionKey.isValid()) {
            selectionKey.selector().wakeup();
        }
    }

    public void close() {
        try {
            socketChannel.close();
        } catch (IOException ioe) {
        }
    }

    public boolean isOpen() {
        return socketChannel.isOpen();
    }

    public SocketChannel getSocketChannel() {
        return socketChannel;
    }

    public String getRemoteAddress() {
        return remoteAddress;
    }

    public long getLastRead() {
        return lastRead;
    }

    public boolean idleTimeout(int timeoutMillis) {
        return timeoutMillis > 0 && System.currentTimeMillis() - lastRead > timeoutMillis;
    }

    public void setAttachment(Object attachment) {
        this.attachment = attachment;
    }

    public Object getAttachment() {
        return attachment;
    }

    SelectionKey getSelectionKey() {
        return selectionKey;
    }

    void updateLastRead() {
        lastRead = System.currentTimeMillis();
    }

    Queue<WriteEvent> getWriteEvents() {
        return writeEvents;
    }
}

https://pastebin.com/r37vPUtJ

package com.palidino.nio;

import java.nio.ByteBuffer;

public class WriteEvent {
    private ByteBuffer buffer;
    private WriteEventHandler handler;

    public WriteEvent(byte[] original, int offset, int length, WriteEventHandler handler) {
        if (original == null) {
            throw new NullPointerException("original can't be null");
        }
        if (offset < 0 || length < 0) {
            throw new NegativeArraySizeException("offset and length must be greater than 0");
        }
        if (offset > original.length || length + offset > original.length) {
            throw new ArrayIndexOutOfBoundsException("length + offset can't be greater than original.length");
        }
        if (original.length == 0 || length == 0) {
            throw new IllegalArgumentException("length must be greater than 0");
        }
        buffer = ByteBuffer.allocateDirect(length);
        buffer.put(original, offset, length);
        buffer.flip();
        buffer = buffer.asReadOnlyBuffer();
        this.handler = handler;
    }

    public WriteEvent(ByteBuffer original, WriteEventHandler handler) {
        buffer = ByteBuffer.allocateDirect(original.capacity());
        ByteBuffer readOnlyCopy = original.asReadOnlyBuffer();
        readOnlyCopy.flip();
        buffer.put(readOnlyCopy);
        buffer.flip();
        buffer = buffer.asReadOnlyBuffer();
        this.handler = handler;
    }

    ByteBuffer getBuffer() {
        return buffer;
    }

    WriteEventHandler getHandler() {
        return handler;
    }
}

В качестве альтернативы созданию прямого буфера для каждой записи, имеющего прямой буфер в моем потоке селектора и определяющего его размер в том месте, где из буфера fromBuffer редко, если вообще когда-либо, накладывается ограничение при записи.

private void write(SelectionKey selectionKey) throws IOException {
    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    if (!socketChannel.isOpen()) {
        return;
    }
    Session session = (Session) selectionKey.attachment();
    if (session.getWriteEvents().isEmpty()) {
        return;
    }
    try {
        while (!session.getWriteEvents().isEmpty()) {
            WriteEvent writeEvent = session.getWriteEvents().peek();
            ByteBuffer fromBuffer = writeEvent.getBuffer();
            int position = fromBuffer.position();
            int limit = fromBuffer.limit();
            try {
                do {
                    directBuffer.clear();
                    if (fromBuffer.remaining() > directBuffer.remaining()) {
                        fromBuffer.limit(fromBuffer.remaining() - directBuffer.remaining());
                    }
                    directBuffer.put(fromBuffer);
                    directBuffer.flip();
                    position += socketChannel.write(directBuffer);
                    fromBuffer.position(position);
                    fromBuffer.limit(limit);
                } while (directBuffer.remaining() == 0 && fromBuffer.remaining() > 0);
                if (fromBuffer.remaining() > 0) {
                    break;
                }
                if (writeEvent.getHandler() != null) {
                    writeEvent.getHandler().complete(session, true);
                }
                session.getWriteEvents().poll();
            } catch (Exception e2) {
                fromBuffer.position(position);
                fromBuffer.limit(limit);
                throw e2;
            }
        }
    } catch (Exception e) {
        error(e, session);
    }
    if (selectionKey.isValid() && session.getWriteEvents().isEmpty()) {
        selectionKey.interestOps(SelectionKey.OP_READ);
    }
}

1 Ответ

0 голосов
/ 06 июня 2019

В чем я не уверен, так это в том случае, если было бы быстрее сделать это прямым или нет.

Чтение в прямые буферы может избавить вас от дополнительного копирования / выделения собственной памяти, делая это быстрее и потребляя меньше циклов ЦП. Но если вы хотите обработать данные, считанные в прямой буфер, вам все равно нужно скопировать их в byte[].

Чтение в буферы кучи включает в себя чтение во временный прямой буфер, а затем копирование его содержимого обратно в кучу. Разница между прямым и кучевым буфером заключается в том, что прямой буфер имеет свое содержимое malloc ed (т.е. вне кучи Java). Буфер кучи в свою очередь поддерживается byte[], поэтому вы не можете получить необработанный указатель на его содержимое для выполнения ввода-вывода без его копирования (GC может перемещать объект по куче, вызывая повреждение кучи)

Несмотря на то, что в JNI есть «критическая область», внутри которой вы можете получить этот необработанный указатель, не делая копию, он оказывает влияние на GC, поскольку объект либо прикреплен к своему текущему местоположению, либо GC полностью отключен. Я экспериментировал с этим некоторое время назад Измерение производительности java.io.InputStream

...