У меня есть вопрос по использованию java nio, и я надеюсь, что тот, кто обладает большим знанием java nio, может помочь мне прояснить некоторые заблуждения.
Я использую сокет java nio.Возможно, что буфер записи заполняется с помощью socketchannel.write ().В этом случае оставшийся буфер ставится в очередь, и ключ изменяется на OP_WRITE.Один из моих сценариев заключается в том, что длина очереди довольно большая.Каждый раз перед вызовом selector.select () я меняю ключ на OP_WRITE из другой очереди с именем pendingRequest.Но я считаю, что, поскольку чтение выполняется довольно медленно, после завершения обработки отправки многие сообщения остаются неписанными, и они все еще находятся в очереди.Как справиться с этой проблемой?
В моем коде у меня есть два места для письма.Один из генератора: когда у него есть сообщение для публикации, он пишет напрямую в канал.Если буфер заполнен, данные будут поставлены в очередь.Второе место в диспетчере: когда ключ доступен для записи, он вызывает функцию write () для записи данных в очереди.Я предполагаю, что две части могут конкурировать за запись.Я просто чувствую, что моему коду не хватает некоторой обработки для взаимодействия двух записей.
Есть ли решение для решения моей проблемы, представленной выше?Я нахожу в своем коде многие данные из очереди не могут быть записаны.Когда ключ доступен для записи, генератор может снова записать данные, что приведет к тому, что данные в очереди будут иметь меньше изменений для записи.Как сделать эту часть правильной?Спасибо
// В WriteListener () код записи состоит из следующих трех частей
public synchronized int writeData(EventObject source) {
int n = 0;
int count = 0;
SocketChannel socket = (SocketChannel)source.getSource();
ByteBuffer buffer = ((WriteEvent)source).getBuffer();
try {
write(socket);
} catch (IOException e1) {
e1.printStackTrace();
}
while (buffer.position()>0) {
try {
buffer.flip();
n = socket.write(buffer);
if(n == 0) {
key.interestOps(SelectionKey.OP_WRITE); synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket);
if(queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(socket, queue);
}
queue.add(buffer);
logger.logInfo("queue length:" + queue.size());
}
break;
}
count += n;
} catch (IOException e) {
e.printStackTrace();
} finally {
buffer.compact();
}
}
if(buffer.position()==0) {
key.interestOps(SelectionKey.OP_READ);
}
return count;
}
// ==== Этот метод записи используется для записи буфера в очереди
public synchronized int write(SocketChannel sc, ByteBuffer wbuf) {
int n = 0;
int count = 0;
SelectionKey key = sc.keyFor(this.dispatcher.getDemultiplexer().getDemux());
while (wbuf.position()>0) {
try {
wbuf.flip();
n = sc.write(wbuf);
if(n == 0) {
key.interestOps(SelectionKey.OP_WRITE);
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(sc);
if(queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(sc, queue);
}
queue.add(wbuf);
}
break;
}
count += n;
} catch (IOException e) {
e.printStackTrace();
} finally {
wbuf.compact();
}
}
if(wbuf.position()==0) {
wbuf.clear();
key.interestOps(SelectionKey.OP_READ);
}
return n;
}
// ==== Этот метод является обратным вызовом Dispatch, когда key.isWritable () имеет значение true
public void write(SocketChannel socketChannel) throws IOException {
SelectionKey key = socketChannel.keyFor(this.dispatcher.getDemultiplexer().getDemux());
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);
if(queue == null || queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for data.
try {
if (key!=null)
key.interestOps(SelectionKey.OP_READ);
} catch(Exception ex) {
if (key!=null)
key.cancel();
}
}
// Write until there's not more data ...
int n = 0;
while (queue != null && !queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.get(0);
// zero length write, break the loop and wait for next writable time
n = write(socketChannel, buf);
logger.logInfo("queue length:" + queue.size() + " used time: " + (t2-t1) + " ms.");
if(n==0) {
break;
}
queue.remove(0);
}
}