Jboss Netty - Как обслуживать 2 соединения, используя 3 рабочих потока - PullRequest
2 голосов
/ 23 июля 2011

В качестве простого примера, скажем, я хочу обрабатывать 3 одновременных TCP-подключения клиентов, используя только 2 рабочих потока в netty, как бы я это сделал?

Вопросы A) С кодом ниже мое третье соединение не получает никаких данных от сервера - соединение просто сидит там. Обратите внимание, как мой рабочий исполнитель и рабочий счетчик равен 2. Так что, если у меня есть 2 рабочих потока и 3 соединения, не должны ли все три соединения обслуживаться двумя потоками?

В) Другой вопрос - использует ли netty CompletionService из java.util.concurrent? Кажется, он не использует его. Кроме того, я не видел никакого исходного кода, который делает executor.submit или future.get Таким образом, все это добавило к путанице того, как он обрабатывает и обслуживает данные для соединений, которые БОЛЬШЕ, чем его рабочие потоки?

С) Я заблудился от того, как netty обрабатывает более 10000 одновременных TCP-соединений .... это создаст 10000 потоков? Поток на соединение не является масштабируемым решением, поэтому я запутался, потому что мой тестовый код не работает должным образом.

    import java.net.InetSocketAddress;
    import java.nio.channels.ClosedChannelException;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.logging.Level;
    import java.util.logging.Logger;

    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelFuture;
    import org.jboss.netty.channel.ChannelFutureListener;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    import org.jboss.netty.handler.codec.string.StringEncoder;

    public class SRNGServer {

      public static void main(String[] args) throws Exception {
          // Configure the server.
          ServerBootstrap bootstrap = new ServerBootstrap(
                  new NioServerSocketChannelFactory(
                          Executors.newCachedThreadPool(),
                          //Executors.newCachedThreadPool()
                          Executors.newFixedThreadPool(2),2
                          ));

          // Configure the pipeline factory.
          bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP());

          // Bind and start to accept incoming connections.
          bootstrap.bind(new InetSocketAddress(8080));
      }



      private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler {

        private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName());


        @Override
        public void channelConnected(
                ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

          // Send greeting for a new connection.
          Channel ch=e.getChannel();

          System.out.printf("channelConnected with channel=[%s]%n", ch);

          ChannelFuture writeFuture=e.getChannel().write("It is " + new Date() + " now.\r\n");

          SRNGChannelFutureListener srngcfl=new SRNGChannelFutureListener();

          System.out.printf("Registered listener=[%s] for future=[%s]%n", srngcfl, writeFuture);

          writeFuture.addListener(srngcfl);      

        }

        @Override
        public void exceptionCaught(
                ChannelHandlerContext ctx, ExceptionEvent e) {

            logger.log(
                    Level.WARNING,
                    "Unexpected exception from downstream.",
                    e.getCause());
            if(e.getCause() instanceof ClosedChannelException){
              logger.log(Level.INFO, "****** Connection closed by client - Closing Channel");
            }
            e.getChannel().close();
        }
      }



      private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory {

        public ChannelPipeline getPipeline() throws Exception {

            // Create a default pipeline implementation.
            ChannelPipeline pipeline = Channels.pipeline();

            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new SRNGServerHandlerP());

            return pipeline;
        }
      }


      private static class SRNGChannelFutureListener implements ChannelFutureListener{

        public void operationComplete(ChannelFuture future) throws InterruptedException{
          Thread.sleep(1000*5);
          Channel ch=future.getChannel();
          if(ch!=null && ch.isConnected()){
              ChannelFuture writeFuture=ch.write("It is " + new Date() + " now.\r\n");
              //-- Add this instance as listener itself.
              writeFuture.addListener(this);
          }

        }

      }
    }

1 Ответ

4 голосов
/ 23 июля 2011

Я не проанализировал ваш исходный код подробно, поэтому не знаю точно, почему он не работает должным образом.Но эта строка в SRNGChannelFutureListener выглядит подозрительно:

Thread.sleep(1000*5);

Это сделает поток, выполняющий его, заблокированным на 5 секунд;поток не будет доступен для какой-либо другой обработки в течение этого времени.

Вопрос C: Нет, он не будет создавать 10000 потоков;весь смысл Netty в том, что он этого не делает, потому что это действительно не очень хорошо масштабируется.Вместо этого он использует ограниченное количество потоков из пула потоков, генерирует события всякий раз, когда что-то происходит, и запускает обработчики событий для потоков в пуле.Таким образом, потоки и соединения отделены друг от друга (нет потока для каждого соединения).

Чтобы этот механизм работал правильно, ваши обработчики событий должны возвращаться как можно быстрее, чтобы потоки, которые ониДоступен для запуска следующего обработчика событий как можно быстрее.Если вы переводите поток в спящий режим на 5 секунд, то вы сохраняете поток выделенным, поэтому он не будет доступен для обработки других событий.

Вопрос B: Если вы действительно хотите знать, вы можете получитьИсходный код нетти и узнай.Он использует селекторы и другие java.nio классы для выполнения асинхронного ввода-вывода .

...