Как позволить селектору изменить ключ сокета в java nio - PullRequest
0 голосов
/ 13 ноября 2011

У меня есть вопрос по использованию java nio, и я надеюсь, что тот, кто обладает большим знанием java nio, может помочь мне прояснить некоторые заблуждения.

Я использую сокет java nio.Возможно, что буфер записи заполняется с помощью socketchannel.write ().В этом случае оставшийся буфер ставится в очередь, и ключ изменяется на OP_WRITE.Один из моих сценариев заключается в том, что длина очереди довольно большая.Каждый раз перед вызовом selector.select () я меняю ключ на OP_WRITE из другой очереди с именем pendingRequest.Но я считаю, что, поскольку чтение выполняется довольно медленно, после завершения обработки отправки многие сообщения остаются неписанными, и они все еще находятся в очереди.Как справиться с этой проблемой?

В моем коде у меня есть два места для письма.Один из генератора: когда у него есть сообщение для публикации, он пишет напрямую в канал.Если буфер заполнен, данные будут поставлены в очередь.Второе место в диспетчере: когда ключ доступен для записи, он вызывает функцию write () для записи данных в очереди.Я предполагаю, что две части могут конкурировать за запись.Я просто чувствую, что моему коду не хватает некоторой обработки для взаимодействия двух записей.

Есть ли решение для решения моей проблемы, представленной выше?Я нахожу в своем коде многие данные из очереди не могут быть записаны.Когда ключ доступен для записи, генератор может снова записать данные, что приведет к тому, что данные в очереди будут иметь меньше изменений для записи.Как сделать эту часть правильной?Спасибо

// В WriteListener () код записи состоит из следующих трех частей

   public synchronized int writeData(EventObject source) {      
    int n = 0; 
    int count = 0;

    SocketChannel socket = (SocketChannel)source.getSource();       
    ByteBuffer buffer = ((WriteEvent)source).getBuffer();   
    try {
        write(socket);
    } catch (IOException e1) {          
        e1.printStackTrace();
    }       

    while (buffer.position()>0) {   
        try {           
                buffer.flip();  
                n = socket.write(buffer);                                   
                if(n == 0) {
                        key.interestOps(SelectionKey.OP_WRITE);                         synchronized (this.pendingData) {  
                            List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket); 
                            if(queue == null) {
                                queue = new ArrayList<ByteBuffer>();
                                this.pendingData.put(socket, queue); 
                        }
                        queue.add(buffer);

                        logger.logInfo("queue length:" + queue.size());
                    }                                               
                    break;
                }               
                count += n; 

        } catch (IOException e) {               
            e.printStackTrace();
        }   finally {                       
            buffer.compact();              
        }
    }   

    if(buffer.position()==0) {                      
        key.interestOps(SelectionKey.OP_READ);                  
    }
            return count;   

}   

// ==== Этот метод записи используется для записи буфера в очереди

  public synchronized int write(SocketChannel sc, ByteBuffer wbuf) {        
    int n = 0; 
    int count = 0;

    SelectionKey key = sc.keyFor(this.dispatcher.getDemultiplexer().getDemux());                
    while (wbuf.position()>0) {     
        try {           
            wbuf.flip();        

            n = sc.write(wbuf);             

            if(n == 0) {    
                   key.interestOps(SelectionKey.OP_WRITE);                                  
                    synchronized (this.pendingData) {  
                        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(sc); 
                        if(queue == null) {
                                queue = new ArrayList<ByteBuffer>();
                                this.pendingData.put(sc, queue); 
                        }
                        queue.add(wbuf);
                    }

                    break;
                }               
                count += n; 

        } catch (IOException e) {               
            e.printStackTrace();
        }   finally {               

            wbuf.compact();                
        }
    }   

    if(wbuf.position()==0) {    
        wbuf.clear();               
        key.interestOps(SelectionKey.OP_READ);          
    }

return n;       
}   

// ==== Этот метод является обратным вызовом Dispatch, когда key.isWritable () имеет значение true

public void write(SocketChannel socketChannel) throws IOException {         
   SelectionKey key = socketChannel.keyFor(this.dispatcher.getDemultiplexer().getDemux());     
    synchronized (this.pendingData) {             
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);              
        if(queue == null || queue.isEmpty()) {                 
            // We wrote away all data, so we're no longer interested                 
            // in writing on this socket. Switch back to waiting for  data.                 
            try {                     
                if (key!=null)                         
                    key.interestOps(SelectionKey.OP_READ);                 
            } catch(Exception ex) {                     
                if (key!=null)                         
                    key.cancel();                 
                }             
        }           

        // Write until there's not more data ...    
        int n = 0;
        while (queue != null && !queue.isEmpty()) {                 
            ByteBuffer buf = (ByteBuffer) queue.get(0);   
            // zero length write, break the loop and wait for next writable time 
            n = write(socketChannel, buf);

            logger.logInfo("queue length:" + queue.size() + " used time: " + (t2-t1) + " ms.");

            if(n==0)  {             
                break;
            }
                      queue.remove(0); 

        }        

 }   

Ответы [ 2 ]

0 голосов
/ 13 ноября 2011
  1. Необходимо убедиться, что новые данные ставятся в очередь после данных, которые уже ожидают записи.

  2. Если поведение сохраняется, у вас действительно есть только два варианта: либо отключить клиент по причине неправильного поведения, либо прекратить выводить его, пока не будет очищено отставание. Возможно оба.

Вы можете реализовать первое через ловкое использование длинного тайм-аута select (). Если select () возвращает ноль, это означает, что либо нет зарегистрированных каналов, либо ничего не произошло с любым из них в течение периода ожидания, и в этом случае вы можете подумать об отключении от всех клиентов. Если у вас много одновременных клиентов, которые слишком загружены, чтобы работать, вам нужно будет отслеживать последний раз, когда каждый канал был выбран, и отключать любой канал, чье время последней активности слишком давно.

В течение этого периода времени вы можете также хотеть прекратить производить вывод для этого парня, пока он медленно читает.

Точное определение «длинноватого» оставлено читателю в качестве упражнения, но десять минут приходит на ум в качестве первого приближения.

0 голосов
/ 13 ноября 2011

Если у вас слишком медленные потребители, единственным вариантом может быть их отключение для защиты вашего сервера.Вы не хотите, чтобы один плохой потребитель влиял на других ваших клиентов.

Я обычно увеличиваю размер буфера отправки до точки, где, если он заполняется, я закрываю соединение.Это исключает сложность обработки неписанных данных в коде Java, потому что все, что вы действительно делаете, это немного расширяете буфер.Если вы увеличиваете размер буфера отправки, вы делаете это прозрачно.Возможно, вам даже не нужно играть с размером буфера отправки, по умолчанию обычно около 64 КБ.

...