Много UDP-запросов потеряно на UDP-сервере с Netty - PullRequest
6 голосов
/ 09 марта 2012

Я написал простой UDP-сервер с Netty, который просто распечатывает в журналах полученные сообщения (кадры). Для этого я создал простой декодер кадров и простой обработчик сообщений. У меня также есть клиент, который может отправлять несколько запросов последовательно и / или параллельно.

Когда я настраиваю свой клиентский тестер на последовательную отправку, например, нескольких сотен запросов с небольшой задержкой между ними, мой сервер, написанный на Netty, получает их все должным образом. Но в тот момент, когда я увеличиваю количество одновременных запросов в моем клиенте (например, 100) в сочетании с последовательными и несколькими повторениями, мой сервер начинает терять много запросов. Например, когда я отправляю 50000 запросов, мой сервер получает около 49000 только при использовании простого ChannelHandler, который распечатывает полученное сообщение.

И когда я добавляю простой декодер кадров (который распечатывает кадр и копирует его в другой буфер) перед этим обработчиком, сервер обрабатывает только половину запросов !!

Я заметил, что независимо от количества рабочих, которые я указываю для созданного NioDatagramChannelFactory, всегда есть один и только один поток, который обрабатывает запросы (я использую рекомендованный Executors.newCachedThreadPool () в качестве другого параметра).

Я также создал еще один аналогичный простой UDP-сервер на основе DatagramSocket, поставляемого с JDK, и он прекрасно обрабатывает все запросы с потерей 0 (нуля) !! Когда я отправляю 50000 запросов на моем клиенте (например, с 1000 потоков), я получаю 50000 запросов на своем сервере.

Я что-то не так делаю при настройке моего UDP-сервера с использованием Netty? Или, может быть, Netty просто не предназначен для поддержки такой нагрузки? Почему в данном пуле потоков кэширования используется только один поток (я заметил, что только один поток и всегда один и тот же используется при поиске в JMX jconsole и при проверке имени потока в выходных журналах)? Я думаю, что если бы больше потоков использовалось, как ожидалось, сервер мог бы легко справиться с такой нагрузкой, потому что я могу сделать это без проблем, если не использую Netty!

См. Мой код инициализации ниже:

...

lChannelfactory = new NioDatagramChannelFactory( Executors.newCachedThreadPool(), nbrWorkers );
lBootstrap = new ConnectionlessBootstrap( lChannelfactory );

lBootstrap.setPipelineFactory( new ChannelPipelineFactory() {
    @Override
    public ChannelPipeline getPipeline()
    {
        ChannelPipeline lChannelPipeline = Channels.pipeline();
        lChannelPipeline.addLast( "Simple UDP Frame Dump DECODER", new SimpleUDPPacketDumpDecoder( null ) );            
        lChannelPipeline.addLast( "Simple UDP Frame Dump HANDLER", new SimpleUDPPacketDumpChannelHandler( lOuterFrameStatsCollector ) );            
        return lChannelPipeline;
    }
} );

bindChannel = lBootstrap.bind( socketAddress );

...

И содержание метода decode () в моем декодере:

protected Object decode(ChannelHandlerContext iCtx, Channel iChannel, ChannelBuffer iBuffer) throws Exception
{
    ChannelBuffer lDuplicatedChannelBuffer = null;
    sLogger.debug( "Decode method called." );

    if ( iBuffer.readableBytes() < 8 ) return null;
    if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();

    if ( iBuffer.readable() ) 
    {        
        sLogger.debug( convertToAsciiHex( iBuffer.array(), iBuffer.readableBytes() ) );                     
        lDuplicatedChannelBuffer = ChannelBuffers.dynamicBuffer( iBuffer.readableBytes() );            
        iBuffer.readBytes( lDuplicatedChannelBuffer );
    }

    return lDuplicatedChannelBuffer;
}

И содержимое метода messageReceived () в моем обработчике:

public void messageReceived(final ChannelHandlerContext iChannelHandlerContext, final MessageEvent iMessageEvent) throws Exception
{
    ChannelBuffer lMessageBuffer = (ChannelBuffer) iMessageEvent.getMessage();
    if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();

    if ( lMessageBuffer.readable() ) 
    {        
        sLogger.debug( convertToAsciiHex( lMessageBuffer.array(), lMessageBuffer.readableBytes() ) );            
        lMessageBuffer.discardReadBytes();
    }
}

1 Ответ

7 голосов
/ 10 марта 2012

Вы неправильно сконфигурировали экземпляр ConnectionlessBootstrap.

  1. Необходимо настроить следующие параметры с оптимальными значениями.

    Размер SO_SNDBUF, размер SO_RCVBUF и ReceiveBufferSizePredictorFactory

    lBootstrap.setOption("sendBufferSize", 1048576);
    
    lBootstrap.setOption("receiveBufferSize", 1048576);
    
    lBootstrap.setOption("receiveBufferSizePredictorFactory", 
     new AdaptiveReceiveBufferSizePredictorFactory(MIN_SIZE, INITIAL_SIZE, MAX_SIZE));
    

    проверьте класс DefaultNioDatagramChannelConfig для получения более подробной информации.

  2. Конвейер делает все, используя рабочий поток Netty. Если рабочий поток перегружен, он задержит цикл события селектора выполнение и будет узкое место в чтении / записи канал. Вы должны добавить обработчик выполнения следующим образом: трубопровод. Он освободит рабочий поток для выполнения своей работы.

    ChannelPipeline lChannelPipeline = Channels.pipeline();
    
    lChannelPipeline.addFirst("execution-handler", new ExecutionHandler(
      new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
    
    //add rest of the handlers here
    
...