, поэтому я пытался отправить несколько сообщений в socketChannel, используя java nio Selectors, как показано в методе, написанном ниже. Однако проблема заключается в том, что когда «Список очереди» содержит более одного элемента, то соединение обрывается или, скорее, ни одно из сообщений не принимается другим socketChannel, теперь я не знаю, является ли это проблемой, почему метод read или написать метод? Может кто-нибудь сказать мне, что я здесь делаю не так? И как это исправить? Заранее благодарим.
Пишите и читайте следующим образом: -
private void write(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
List<ByteBuffer> queue = this.pendingWrites.get(selectionKey);
// Write until there's no more data left ...
while (!queue.isEmpty()) {
ByteBuffer buf = queue.get(0);
try {
socketChannel.write(buf);
} catch (IOException ex) {
//There could be an IOException: Connection reset by peer
queue.clear(); //clear the queue
return;
}
if (buf.remaining() > 0) {
// ... or the selectionKey's buffer fills up
break;
}
queue.remove(0);
}
if (queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this selectionKey. Switch back to waiting for
// data.
selectionKey.interestOps(SelectionKey.OP_READ);
this.selector.wakeup();
}
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
this.readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(this.readBuffer);
} catch (IOException e) {
}
//TODO: Shutting down connection
//this.cmdProcessor.connectionClosed(remoteAddress.getAddress());
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
// Remote entity shut the selectionKey down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
byte[] dataCopy = new byte[numRead];
System.arraycopy(this.readBuffer.array(), 0, dataCopy, 0, numRead);
//System.out.println("#tempdata:" + new String(dataCopy, Constants.TELNET_ENCODING));
// If we have already received some data, we add this to our buffer
if (this.pendingReads.containsKey(key)) {
byte[] existingBytes = pendingReads.get(key);
byte[] concatenated = new byte[existingBytes.length + dataCopy.length];
System.arraycopy(existingBytes, 0, concatenated, 0, existingBytes.length);
System.arraycopy(dataCopy, 0, concatenated, existingBytes.length, dataCopy.length);
dataCopy = concatenated;
}
//If somebody sends us very long requests, we just close the connection
if (dataCopy.length > 128000) {
key.channel().close();
key.cancel();
this.pendingReads.remove(key);
return;
}
// In case we have now finally reached all characters
byte[] unprocessed = processReceiveBuffer(dataCopy, key);
this.pendingReads.put(key, unprocessed);
}
private byte[] processReceiveBuffer(byte[] data, SelectionKey key) throws InvalidProtocolBufferException {
if (data.length > 4) {//the first 4 bytes are the length
int msg_length = ((0xFF & data[0]) << 24) | ((0xFF & data[1]) << 16) | ((0xFF & data[2]) << 8) | (0xFF & data[3]);
if ((msg_length + 4) >= data.length) {
byte[] msg_data = new byte[msg_length];
System.arraycopy(data, 4, msg_data, 0, msg_length);
api.BaseMessage bm = api.BaseMessage.parseFrom(msg_data);
handleRequest(key, bm);
msg_data = new byte[msg_length + 4 - data.length];
System.arraycopy(data, msg_length + 4, msg_data, 0, msg_data.length);
return msg_data;
}
}
return data;
}