Простой неблокирующий сервер
/ 22 августа 2011

В целях написания программы обмена мгновенными сообщениями я пытаюсь создать простой серверный класс, который будет работать в своем собственном потоке.

Что должен делать сервер

  • принимать соединения от / подключаться к другим экземплярам сервера и связывать клавиши выбора для соединений в Map<Integer, SelectionKey> keys с идентификатором, чтобы поток сообщений мог обращаться к соединениям по идентификатору
  • чтение / запись в соединения
  • хранить входящие сообщения в очереди
  • тема сообщений
    • получать входящие сообщения
    • очереди сообщений для отправки: send_message(int id, String msg)

Мой текущий подход основан главным образом на следующем примере: Простой неблокирующий сервер Echo с Java nio .
Я также использовал Использование селектора для управления неблокирующими сокетами и страницами с рекламой, чтобы узнать о неблокирующих сокетах и ​​селекторах.

Текущий код

  • Предложения EJP выполнены
  • небольшие изменения
package snserver;

/* imports */

//class SNServer (Simple non-blocking Server)

public class SNServer extends Thread {
    private int port;
    private Selector selector;
    private ConcurrentMap<Integer, SelectionKey> keys; // ID -> associated key
    private ConcurrentMap<SocketChannel,List<byte[]>> dataMap_out;
    ConcurrentLinkedQueue<String> in_msg; //incoming messages to be fetched by messenger thread

    public SNServer(int port) {
        this.port = port;
        dataMap_out = new ConcurrentHashMap<SocketChannel, List<byte[]>>();
        keys = new ConcurrentHashMap<Integer, SelectionKey>();

    public void start_server() throws IOException {
        // create selector and channel
        this.selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();

        // bind to port
        InetSocketAddress listenAddr = new InetSocketAddress((InetAddress)null, this.port);
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        log("Echo server ready. Ctrl-C to stop.");

        // processing
        while (true) {
            // wait for events

            // wakeup to work on selected keys
            Iterator keys = this.selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();

                // this is necessary to prevent the same key from coming up 
                // again the next time around.

                if (! key.isValid()) {

                if (key.isAcceptable()) {
                else if (key.isReadable()) {
                else if (key.isWritable()) {
                else if(key.isConnectable()) {

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverChannel.accept();
        send_message(key, "Welcome."); //DEBUG

        Socket socket = channel.socket();
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();
        log("Connected to: " + remoteAddr);

        // register channel with selector for further IO
        dataMap_out.put(channel, new ArrayList<byte[]>());
        channel.register(this.selector, SelectionKey.OP_READ);

        //store key in 'keys' to be accessable by ID from messenger thread //TODO first get ID
        keys.put(0, key);

    //TODO verify, test
    public void init_connect(String addr, int port){
        try {
            SocketChannel channel = createSocketChannel(addr, port);
            channel.register(this.selector, channel.validOps()/*, SelectionKey.OP_?*/);
        catch (IOException e) {
            //TODO handle

    //TODO verify, test
    private void connect(SelectionKey key) {
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            channel.finishConnect(); //try to finish connection - if 'false' is returned keep 'OP_CONNECT' registered
            //store key in 'keys' to be accessable by ID from messenger thread //TODO first get ID
            keys.put(0, key);
        catch (IOException e0) {
            try {
                //TODO handle ok?
            catch (IOException e1) {
                //TODO handle


    private void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();

        ByteBuffer buffer = ByteBuffer.allocate(8192);
        int numRead = -1;
        try {
            numRead = channel.read(buffer);
        catch (IOException e) {

        if (numRead == -1) {
            Socket socket = channel.socket();
            SocketAddress remoteAddr = socket.getRemoteSocketAddress();
            log("Connection closed by client: " + remoteAddr); //TODO handle

        byte[] data = new byte[numRead];
        System.arraycopy(buffer.array(), 0, data, 0, numRead);
        in_msg.add(new String(data, "utf-8"));

    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        List<byte[]> pendingData = this.dataMap_out.get(channel);
        Iterator<byte[]> items = pendingData.iterator();
        while (items.hasNext()) {
            byte[] item = items.next();
            //TODO is this correct? -> re-doing write in loop with same buffer object
            ByteBuffer buffer = ByteBuffer.wrap(item);
            int bytes_to_write = buffer.capacity();
            while (bytes_to_write > 0) {
                bytes_to_write -= channel.write(buffer);

    public void queue_data(SelectionKey key, byte[] data) {
        SocketChannel channel = (SocketChannel) key.channel();
        List<byte[]> pendingData = this.dataMap_out.get(channel);


    public void send_message(int id, String msg) {
        SelectionKey key = keys.get(id);
        if (key != null)
            send_message(key, msg);
            //TODO handle

    public void send_message(SelectionKey key, String msg) {
        try {
            queue_data(key, msg.getBytes("utf-8"));
        catch (UnsupportedEncodingException ex) {
            //is not thrown: utf-8 is always defined

    public String get_message() {
        return in_msg.poll();

    private static void log(String s) {

    public void run() {
        try {
        catch (IOException e) {
            System.out.println("IOException: " + e);
            //TODO handle exception

    // Creates a non-blocking socket channel for the specified host name and port.
    // connect() is called on the new channel before it is returned.
    public static SocketChannel createSocketChannel(String hostName, int port) throws IOException {
        // Create a non-blocking socket channel
        SocketChannel sChannel = SocketChannel.open();

        // Send a connection request to the server; this method is non-blocking
        sChannel.connect(new InetSocketAddress(hostName, port));
        return sChannel;

Мой вопрос: является ли приведенный выше код правильным и хорошим или что я должен изменить? Как правильно выполнить требования, указанные выше? Также обратите внимание на мои "TODO".

Спасибо за любую помощь!

1 Ответ

/ 22 августа 2011

Здесь есть несколько проблем.

  1. Вы не проверяете результат write ().Он может вернуть все что угодно с нуля.Возможно, вам придется делать это несколько раз.

  2. Если finishConnect () возвращает false, это не ошибка, просто он еще не завершен, поэтому просто оставьте OP_CONNECT зарегистрированными ждать, пока он выстрелит (снова).Единственный validOps () для SocketChannel, который вы только что создали с помощью SocketChannel.open (), это OP_CONNECT.Если finishConnect () выдает исключение, это ошибка, и вы должны закрыть канал.

  3. Закрытие канала отменяет ключ, вам не нужно его отменять самостоятельно.

  4. Обычно при привязке следует использовать ноль в качестве локального адреса InetAddress.
