Отправка нескольких сообщений только через один SocketChannel - PullRequest
1 голос
/ 28 марта 2012

После прочтения этого урока: http://rox -xmlrpc.sourceforge.net / niotut / (речь идет о написании неблокирующих сервера и клиента, и я прочитал часть NIO, пропущенную часть SSL), теперь я я пытаюсь переписать свой собственный клиент, но у меня возникла проблема при попытке изменить код клиента.

Во-первых, я хочу, чтобы вы увидели клиентский код учебника, он включает 2 файла:

Но я немного отредактировал NIOClient.java в функции main, чтобы объяснить мою проблему следующим образом:

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

public class NIOClient implements Runnable {
// The host:port combination to connect to
private InetAddress hostAddress;
private int port;

// The selector we'll be monitoring
private Selector selector;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

// A list of PendingChange instances
private List pendingChanges = new LinkedList();

// Maps a SocketChannel to a list of ByteBuffer instances
private Map pendingData = new HashMap();

// Maps a SocketChannel to a RspHandler
private Map rspHandlers = Collections.synchronizedMap(new HashMap());

public NIOClient(InetAddress hostAddress, int port) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
}

public void send(byte[] data, RspHandler handler) throws IOException {
    // Start a new connection
    SocketChannel socket = this.initiateConnection();

    // Register the response handler
    this.rspHandlers.put(socket, handler);

    // And queue the data we want written
    synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socket);
        if (queue == null) {
            queue = new ArrayList();
            this.pendingData.put(socket, queue);
        }
        queue.add(ByteBuffer.wrap(data));
    }

    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
}

public void run() {
    while (true) {
        try {
            // Process any pending changes
            synchronized (this.pendingChanges) {
                Iterator changes = this.pendingChanges.iterator();
                while (changes.hasNext()) {
                    ChangeRequest change = (ChangeRequest) changes.next();
                    switch (change.type) {
                    case ChangeRequest.CHANGEOPS:
                        SelectionKey key = change.socket.keyFor(this.selector);
                        key.interestOps(change.ops);
                        break;
                    case ChangeRequest.REGISTER:
                        change.socket.register(this.selector, change.ops);
                        break;
                    }
                }
                this.pendingChanges.clear();
            }

            // Wait for an event one of the registered channels
            this.selector.select();

            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = this.selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = (SelectionKey) selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isConnectable()) {
                    this.finishConnection(key);
                } else if (key.isReadable()) {
                    this.read(key);
                } else if (key.isWritable()) {
                    this.write(key);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(this.readBuffer);
    } catch (IOException e) {
        // The remote forcibly closed the connection, cancel
        // the selection key and close the channel.
        key.cancel();
        socketChannel.close();
        return;
    }

    if (numRead == -1) {
        // Remote entity shut the socket down cleanly. Do the
        // same from our end and cancel the channel.
        key.channel().close();
        key.cancel();
        return;
    }

    // Handle the response
    this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
}

private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
    // Make a correctly sized copy of the data before handing it
    // to the client
    byte[] rspData = new byte[numRead];
    System.arraycopy(data, 0, rspData, 0, numRead);

    // Look up the handler for this channel
    RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel);

    // And pass the response to it
    if (handler.handleResponse(rspData)) {
        // The handler has seen enough, close the connection
        socketChannel.close();
        socketChannel.keyFor(this.selector).cancel();
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socketChannel);

        // Write until there's not more data ...
        while (!queue.isEmpty()) {
            ByteBuffer buf = (ByteBuffer) queue.get(0);
            socketChannel.write(buf);
            if (buf.remaining() > 0) {
                // ... or the socket's buffer fills up
                break;
            }
            queue.remove(0);
        }

        if (queue.isEmpty()) {
            // We wrote away all data, so we're no longer interested
            // in writing on this socket. Switch back to waiting for
            // data.
            key.interestOps(SelectionKey.OP_READ);
        }
    }
}

private void finishConnection(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Finish the connection. If the connection operation failed
    // this will raise an IOException.
    try {
        socketChannel.finishConnect();
    } catch (IOException e) {
        // Cancel the channel's registration with our selector
        System.out.println(e);
        key.cancel();
        return;
    }

    // Register an interest in writing on this channel
    key.interestOps(SelectionKey.OP_WRITE);
}

private SocketChannel initiateConnection() throws IOException {
    // Create a non-blocking socket channel
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);

    // Kick off connection establishment
    socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));

    // Queue a channel registration since the caller is not the 
    // selecting thread. As part of the registration we'll register
    // an interest in connection events. These are raised when a channel
    // is ready to complete connection establishment.
    synchronized(this.pendingChanges) {
        this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
    }

    return socketChannel;
}

private Selector initSelector() throws IOException {
    // Create a new selector
    return SelectorProvider.provider().openSelector();
}

public static void main(String[] args) {
    try {
        NIOClient client = new NIOClient(
                InetAddress.getByName("127.0.0.1"), 9090);
        Thread t = new Thread(client);
        t.setDaemon(true);
        t.start();

        // 1st
        client.send("hehe|||".getBytes());
        System.out.println("SEND: " + "hehe|||");
        handler.waitForResponse();

        System.out.println("------------");

        // 2nd
        client.send(("hehe|||" + " 2").getBytes());
        System.out.println("SEND: " + "hehe|||" + " 2");
        handler.waitForResponse();

    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

Мой отредактированный клиент просто делает простую вещь - отправляет сообщение на сервер, а затем получает эхо-сообщение от сервера. Конечно, приведенный выше код работает хорошо. Он отправил 2 сообщения, затем получил их обратно правильно.

Но в вышеприведенном клиенте я не хочу, чтобы функция send вызывала этот код:

    // Start a new connection
    SocketChannel socket = this.initiateConnection();

Это означает, что каждое отличительное сообщение будет соответствовать каждому различающемуся новому SocketChannel, но теперь я хочу использовать только один SocketChannel для отправки многих сообщений, поэтому я изменяю клиента следующим образом:

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

public class MyClient implements Runnable {
// The host:port combination to connect to
private InetAddress hostAddress;
private int port;

// The selector we'll be monitoring
private Selector selector;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(8);

// A list of PendingChange instances
private List pendingChanges = new LinkedList();

// Maps a SocketChannel to a list of ByteBuffer instances
private Map pendingData = new HashMap();

// Maps a SocketChannel to a RspHandler
private Map rspHandlers = Collections.synchronizedMap(new HashMap());


private SocketChannel socket;
private static MyResponseHandler handler;

public MyClient(InetAddress hostAddress, int port) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();

    // Start a new connection
    socket = this.initiateConnection();

    handler = new MyResponseHandler();
    // Register the response handler
    this.rspHandlers.put(socket, handler);      
}

public void send(byte[] data) throws IOException {

    // And queue the data we want written
    synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socket);
        if (queue == null) {
            queue = new ArrayList();
            this.pendingData.put(socket, queue);
        }
        queue.add(ByteBuffer.wrap(data));
    }

    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
}

public void run() {
    while (true) {
        try {
            // Process any pending changes
            synchronized (this.pendingChanges) {
                Iterator changes = this.pendingChanges.iterator();
                while (changes.hasNext()) {
                    ChangeRequest change = (ChangeRequest) changes.next();
                    switch (change.type) {
                    case ChangeRequest.CHANGEOPS:
                        SelectionKey key = change.socket.keyFor(this.selector);
                        key.interestOps(change.ops);
                        break;
                    case ChangeRequest.REGISTER:
                        change.socket.register(this.selector, change.ops);
                        break;
                    }
                }
                this.pendingChanges.clear();
            }

            // Wait for an event one of the registered channels
            this.selector.select();

            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = this.selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = (SelectionKey) selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isConnectable()) {
                    this.finishConnection(key);
                } else if (key.isReadable()) {
                    this.read(key);
                } else if (key.isWritable()) {
                    this.write(key);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(this.readBuffer);
    } catch (IOException e) {
        // The remote forcibly closed the connection, cancel
        // the selection key and close the channel.
        key.cancel();
        socketChannel.close();
        return;
    }

    if (numRead == -1) {
        // Remote entity shut the socket down cleanly. Do the
        // same from our end and cancel the channel.
        key.channel().close();
        key.cancel();
        return;
    }

    // Handle the response
    this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
}

private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
    // Make a correctly sized copy of the data before handing it
    // to the client
    byte[] rspData = new byte[numRead];
    System.arraycopy(data, 0, rspData, 0, numRead);

    // Look up the handler for this channel
    MyResponseHandler handler = (MyResponseHandler) this.rspHandlers.get(socketChannel);

    // And pass the response to it
    if (handler.handleResponse(rspData)) {
        // The handler has seen enough, close the connection
        socketChannel.close();
        socketChannel.keyFor(this.selector).cancel();
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socketChannel);

        // Write until there's not more data ...
        while (!queue.isEmpty()) {
            ByteBuffer buf = (ByteBuffer) queue.remove(0);
            socketChannel.write(buf);

            //-- DEBUG --
            System.out.println("===>>> socketChannel.write: " + new String(buf.array()));

            if (buf.remaining() > 0) {
                // ... or the socket's buffer fills up
                break;
            }
        }

        if (queue.isEmpty()) {
            // We wrote away all data, so we're no longer interested
            // in writing on this socket. Switch back to waiting for
            // data.
            key.interestOps(SelectionKey.OP_READ);
        }
    }
}

private void finishConnection(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Finish the connection. If the connection operation failed
    // this will raise an IOException.
    try {
        socketChannel.finishConnect();
    } catch (IOException e) {
        // Cancel the channel's registration with our selector
        System.out.println(e);
        key.cancel();
        return;
    }

    // Register an interest in writing on this channel
    key.interestOps(SelectionKey.OP_WRITE);
}

private SocketChannel initiateConnection() throws IOException {
    // Create a non-blocking socket channel
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);

    // Kick off connection establishment
    socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));

    // Queue a channel registration since the caller is not the 
    // selecting thread. As part of the registration we'll register
    // an interest in connection events. These are raised when a channel
    // is ready to complete connection establishment.
    synchronized(this.pendingChanges) {
        this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
    }

    return socketChannel;
}

private Selector initSelector() throws IOException {
    // Create a new selector
    return SelectorProvider.provider().openSelector();
}

public static void main(String[] args) {
    try {
        MyClient client = new MyClient(
                InetAddress.getByName("127.0.0.1"), 9090);
        Thread t = new Thread(client);
        t.setDaemon(true);
        t.start();

        // 1st
        client.send("hehe|||".getBytes());
        System.out.println("SEND: " + "hehe|||");
        handler.waitForResponse();

        System.out.println("------------");

        // 2nd
        client.send(("hehe|||" + " 2").getBytes());
        System.out.println("SEND: " + "hehe|||" + " 2");
        handler.waitForResponse();

    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

Но после запуска вышеуказанного клиента я просто вижу, что только 1-е сообщение отправлено и получено обратно, и после отладки я знаю, что 2-е сообщение не отправлено, но я не знаю, почему и как решить эту проблему .

Кто-нибудь знает ответчик?

Спасибо, что прочитали мой очень длинный вопрос.

1 Ответ

4 голосов
/ 29 марта 2012

Вы начали не с того места.Есть много проблем с этой статьей.Все эти вещи, ожидающие изменения очереди, являются огромным, ненужным осложнением.Просто wakeup() селектор, если вам нужно зарегистрироваться / отменить регистрацию в другом потоке (хотя, зачем вам вообще это нужно делать, для меня совершенно загадка), и вы можете изменить interestOps в любое время таким же образом с полной надежностьюдаже если FUD, который он распространяет о различных реализациях, когда-либо сбудется.

В статье есть несколько других проблем, которые показывают, что автор действительно не знает, о чем говорит.IOException не обязательно означает «удаленное соединение принудительно закрыто».Его метод finishConnection() игнорирует возвращаемое значение, которое, если имеет значение false, означает, что соединение все еще находится в состоянии ожидания, поэтому он преждевременно регистрирует канал за этапом OP_CONNECT.Закрытие канала отменяет ключ, поэтому все эти cancel() вызовы, которые непосредственно предшествуют или следуют за close() вызовами, являются избыточными и могут быть удалены (хотя есть места, где он отменяет без закрытия, что также неправильно, когда это происходит).

Далее:

нигде из двух только что представленных методов мы не требуем установки флага OP_CONNECT для клавиши выбора канала сокета.Если бы мы это сделали, мы бы перезаписали флаг OP_CONNECT и никогда не завершили бы соединение.И если бы мы объединили их, то рискнули бы попытаться написать по неподключенному каналу (или, по крайней мере, иметь дело с этим делом) "

Это просто полная ерунда класса АOP_CONNECT дважды, или «объединение их затем», что бы это ни значило, не может заставить вас «никогда не завершать соединение» или «пытаться писать по неподключенному каналу». Кажется, он думает, что установка дважды дважды очищает его.

Данные уже были поставлены в очередь (или мы не установили бы соединение в первую очередь).

Странное и необъяснимое предположение.

Вместо этогооб этом довольно сомнительном беспорядке я бы хорошо посмотрел на ветку «Укрощение цирка NIO», которую он цитирует, если вы все еще сможете найти ее после переноса форума Oracle. Отказ от ответственности: я написал некоторые из них.

...