После прочтения этого урока: http://rox -xmlrpc.sourceforge.net / niotut / (речь идет о написании неблокирующих сервера и клиента, и я прочитал часть NIO, пропущенную часть SSL), теперь я я пытаюсь переписать свой собственный клиент, но у меня возникла проблема при попытке изменить код клиента.
Во-первых, я хочу, чтобы вы увидели клиентский код учебника, он включает 2 файла:
Но я немного отредактировал NIOClient.java в функции main
, чтобы объяснить мою проблему следующим образом:
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
public class NIOClient implements Runnable {
// The host:port combination to connect to
private InetAddress hostAddress;
private int port;
// The selector we'll be monitoring
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
// A list of PendingChange instances
private List pendingChanges = new LinkedList();
// Maps a SocketChannel to a list of ByteBuffer instances
private Map pendingData = new HashMap();
// Maps a SocketChannel to a RspHandler
private Map rspHandlers = Collections.synchronizedMap(new HashMap());
public NIOClient(InetAddress hostAddress, int port) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
this.selector = this.initSelector();
}
public void send(byte[] data, RspHandler handler) throws IOException {
// Start a new connection
SocketChannel socket = this.initiateConnection();
// Register the response handler
this.rspHandlers.put(socket, handler);
// And queue the data we want written
synchronized (this.pendingData) {
List queue = (List) this.pendingData.get(socket);
if (queue == null) {
queue = new ArrayList();
this.pendingData.put(socket, queue);
}
queue.add(ByteBuffer.wrap(data));
}
// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();
}
public void run() {
while (true) {
try {
// Process any pending changes
synchronized (this.pendingChanges) {
Iterator changes = this.pendingChanges.iterator();
while (changes.hasNext()) {
ChangeRequest change = (ChangeRequest) changes.next();
switch (change.type) {
case ChangeRequest.CHANGEOPS:
SelectionKey key = change.socket.keyFor(this.selector);
key.interestOps(change.ops);
break;
case ChangeRequest.REGISTER:
change.socket.register(this.selector, change.ops);
break;
}
}
this.pendingChanges.clear();
}
// Wait for an event one of the registered channels
this.selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = this.selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isConnectable()) {
this.finishConnection(key);
} else if (key.isReadable()) {
this.read(key);
} else if (key.isWritable()) {
this.write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
this.readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(this.readBuffer);
} catch (IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
// Handle the response
this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
}
private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
// Make a correctly sized copy of the data before handing it
// to the client
byte[] rspData = new byte[numRead];
System.arraycopy(data, 0, rspData, 0, numRead);
// Look up the handler for this channel
RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel);
// And pass the response to it
if (handler.handleResponse(rspData)) {
// The handler has seen enough, close the connection
socketChannel.close();
socketChannel.keyFor(this.selector).cancel();
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (this.pendingData) {
List queue = (List) this.pendingData.get(socketChannel);
// Write until there's not more data ...
while (!queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.get(0);
socketChannel.write(buf);
if (buf.remaining() > 0) {
// ... or the socket's buffer fills up
break;
}
queue.remove(0);
}
if (queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for
// data.
key.interestOps(SelectionKey.OP_READ);
}
}
}
private void finishConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Finish the connection. If the connection operation failed
// this will raise an IOException.
try {
socketChannel.finishConnect();
} catch (IOException e) {
// Cancel the channel's registration with our selector
System.out.println(e);
key.cancel();
return;
}
// Register an interest in writing on this channel
key.interestOps(SelectionKey.OP_WRITE);
}
private SocketChannel initiateConnection() throws IOException {
// Create a non-blocking socket channel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// Kick off connection establishment
socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));
// Queue a channel registration since the caller is not the
// selecting thread. As part of the registration we'll register
// an interest in connection events. These are raised when a channel
// is ready to complete connection establishment.
synchronized(this.pendingChanges) {
this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
}
return socketChannel;
}
private Selector initSelector() throws IOException {
// Create a new selector
return SelectorProvider.provider().openSelector();
}
public static void main(String[] args) {
try {
NIOClient client = new NIOClient(
InetAddress.getByName("127.0.0.1"), 9090);
Thread t = new Thread(client);
t.setDaemon(true);
t.start();
// 1st
client.send("hehe|||".getBytes());
System.out.println("SEND: " + "hehe|||");
handler.waitForResponse();
System.out.println("------------");
// 2nd
client.send(("hehe|||" + " 2").getBytes());
System.out.println("SEND: " + "hehe|||" + " 2");
handler.waitForResponse();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Мой отредактированный клиент просто делает простую вещь - отправляет сообщение на сервер, а затем получает эхо-сообщение от сервера. Конечно, приведенный выше код работает хорошо. Он отправил 2 сообщения, затем получил их обратно правильно.
Но в вышеприведенном клиенте я не хочу, чтобы функция send
вызывала этот код:
// Start a new connection
SocketChannel socket = this.initiateConnection();
Это означает, что каждое отличительное сообщение будет соответствовать каждому различающемуся новому SocketChannel, но теперь я хочу использовать только один SocketChannel для отправки многих сообщений, поэтому я изменяю клиента следующим образом:
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
public class MyClient implements Runnable {
// The host:port combination to connect to
private InetAddress hostAddress;
private int port;
// The selector we'll be monitoring
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(8);
// A list of PendingChange instances
private List pendingChanges = new LinkedList();
// Maps a SocketChannel to a list of ByteBuffer instances
private Map pendingData = new HashMap();
// Maps a SocketChannel to a RspHandler
private Map rspHandlers = Collections.synchronizedMap(new HashMap());
private SocketChannel socket;
private static MyResponseHandler handler;
public MyClient(InetAddress hostAddress, int port) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
this.selector = this.initSelector();
// Start a new connection
socket = this.initiateConnection();
handler = new MyResponseHandler();
// Register the response handler
this.rspHandlers.put(socket, handler);
}
public void send(byte[] data) throws IOException {
// And queue the data we want written
synchronized (this.pendingData) {
List queue = (List) this.pendingData.get(socket);
if (queue == null) {
queue = new ArrayList();
this.pendingData.put(socket, queue);
}
queue.add(ByteBuffer.wrap(data));
}
// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();
}
public void run() {
while (true) {
try {
// Process any pending changes
synchronized (this.pendingChanges) {
Iterator changes = this.pendingChanges.iterator();
while (changes.hasNext()) {
ChangeRequest change = (ChangeRequest) changes.next();
switch (change.type) {
case ChangeRequest.CHANGEOPS:
SelectionKey key = change.socket.keyFor(this.selector);
key.interestOps(change.ops);
break;
case ChangeRequest.REGISTER:
change.socket.register(this.selector, change.ops);
break;
}
}
this.pendingChanges.clear();
}
// Wait for an event one of the registered channels
this.selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = this.selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isConnectable()) {
this.finishConnection(key);
} else if (key.isReadable()) {
this.read(key);
} else if (key.isWritable()) {
this.write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
this.readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(this.readBuffer);
} catch (IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
// Handle the response
this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
}
private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
// Make a correctly sized copy of the data before handing it
// to the client
byte[] rspData = new byte[numRead];
System.arraycopy(data, 0, rspData, 0, numRead);
// Look up the handler for this channel
MyResponseHandler handler = (MyResponseHandler) this.rspHandlers.get(socketChannel);
// And pass the response to it
if (handler.handleResponse(rspData)) {
// The handler has seen enough, close the connection
socketChannel.close();
socketChannel.keyFor(this.selector).cancel();
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (this.pendingData) {
List queue = (List) this.pendingData.get(socketChannel);
// Write until there's not more data ...
while (!queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.remove(0);
socketChannel.write(buf);
//-- DEBUG --
System.out.println("===>>> socketChannel.write: " + new String(buf.array()));
if (buf.remaining() > 0) {
// ... or the socket's buffer fills up
break;
}
}
if (queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for
// data.
key.interestOps(SelectionKey.OP_READ);
}
}
}
private void finishConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Finish the connection. If the connection operation failed
// this will raise an IOException.
try {
socketChannel.finishConnect();
} catch (IOException e) {
// Cancel the channel's registration with our selector
System.out.println(e);
key.cancel();
return;
}
// Register an interest in writing on this channel
key.interestOps(SelectionKey.OP_WRITE);
}
private SocketChannel initiateConnection() throws IOException {
// Create a non-blocking socket channel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// Kick off connection establishment
socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));
// Queue a channel registration since the caller is not the
// selecting thread. As part of the registration we'll register
// an interest in connection events. These are raised when a channel
// is ready to complete connection establishment.
synchronized(this.pendingChanges) {
this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
}
return socketChannel;
}
private Selector initSelector() throws IOException {
// Create a new selector
return SelectorProvider.provider().openSelector();
}
public static void main(String[] args) {
try {
MyClient client = new MyClient(
InetAddress.getByName("127.0.0.1"), 9090);
Thread t = new Thread(client);
t.setDaemon(true);
t.start();
// 1st
client.send("hehe|||".getBytes());
System.out.println("SEND: " + "hehe|||");
handler.waitForResponse();
System.out.println("------------");
// 2nd
client.send(("hehe|||" + " 2").getBytes());
System.out.println("SEND: " + "hehe|||" + " 2");
handler.waitForResponse();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Но после запуска вышеуказанного клиента я просто вижу, что только 1-е сообщение отправлено и получено обратно, и после отладки я знаю, что 2-е сообщение не отправлено, но я не знаю, почему и как решить эту проблему .
Кто-нибудь знает ответчик?
Спасибо, что прочитали мой очень длинный вопрос.