Java, как управлять потоками для чтения сокетов (websocket)? - PullRequest
0 голосов
/ 15 марта 2020

У меня есть сервер WebSocket.

мой сервер создает новый поток для обработки нового соединения. Поток работает до разрыва веб-сокета.

моя проблема: для 1_000_000 соединений мне нужно 1_000_000 потоки. Как я могу обрабатывать много веб-сокетов потоком? без ожидания?

ServerSocket server;
private ExecutorService executor = new ThreadPoolExecutor(1_000_000 , 1_000_000 , 7, TimeUnit.SECONDS, queue, threadFactory);

try
{
    server = new ServerSocket(port); 
}
catch (IOException e) {}

while (true)
 {
    Socket client = null;

   try
   {
        client = server.accept();

        Runnable r = new Runnable()
        {
           run()
           {
              // this is simple, original is complete WebSocket imp
               client.getInputStream().read();
           }
        };

        executor.execute(r);
    }

 catch (IOException e) {}
}

Ответы [ 3 ]

1 голос
/ 19 марта 2020

Подумайте об этом, у вас есть карта сокетов, и каждый раз, когда сообщение поступает на сервер, вы получаете сообщение и связанный сокет!

эта операция выполняется с ОС (linux, windows, unix, ma c -OS, ...) ядро!

, поэтому вы можете обрабатывать миллион соединений только в одном потоке!

мы называем это None-Blocking сокеты , что означает, что они никогда не блокируют ваш поток для чтения или записи или любой другой операции, такой как accept, и ...!

java имеет пакет для обработки этого! java .nio. *

как это работает?

  • Поток для обработки операций ввода-вывода
  • Селектор для выбора, какие сокеты работают и какой тип операции
  • ByteBuffer для обработки чтения и записи вместо использования socket.stream в blocking-socket

также можно использовать несколько потоков и селекторов (каждый селектор имеет свой собственный поток)

посмотрите на этот пример:

NoneBlockingServer. java:

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NoneBlockingServer {

public static void main(String[] args) throws Exception
{
    runServer("localhost" , 5050);
}


private final static void runServer(String host , int port)throws Exception {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(host, port));
    serverSocketChannel.configureBlocking(false); //config to be a none-blocking serve-socket
    SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    //register to selector for operation ACCEPT !
    //also you can use selectionKey for some other stuffs !

    while (true) {

        int numberOfReadSockets = selector.select();
        //it will wait until a socket(s) be ready for some io operation
        //or other threads call selector.wakeup()

        if(numberOfReadSockets==0){
            //maybe selector.wakeup() called
            //do some sync operations here !
            continue; // continue selecting !
        }

        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

        while (keys.hasNext())
        {
            SelectionKey key = keys.next();
            keys.remove(); //remove selected key from current selection !

            //handle selected key


            if(key.isValid() && key.isReadable())
            {

                //it means this socket is valid and has data to read


                SocketChannel socketChannel =
                        (SocketChannel) key.channel();

                ByteBuffer buffer = ByteBuffer.allocate(100); // allocate 100 bytes for buffer
                //maybe you must use an allocated buffer for each connection
                // instead of allocate for each operation

                int read = socketChannel.read(buffer);

                if(read<0)
                {
                    //need to close channel !
                    socketChannel.close(); // explicitly remove from selector
                    System.out.println("CONNECTION CLOSED");
                    continue; //socket closed and other operations will skip
                }else
                {
                    buffer.flip(); // you need to learn work with ByteBuffers
                    byte[] bytes = new byte[buffer.remaining()];
                    buffer.get(bytes);
                    //maybe convert it to String
                    String msg = new String(bytes);
                    //use msg !
                    System.out.println("MESSAGE : "+msg);

                    key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                    //set interestOps to WRIT and READ to write hello back message !
                    key.attach(ByteBuffer.wrap("Hello Client !".getBytes("UTF-8")));
                    //wrap a array of bytes using wrap and attach it to selectionKey
                }

            }

            if(key.isValid() && key.isWritable())
            {
                //it means this socket is valid and have space to write data !

                SocketChannel socketChannel =
                        (SocketChannel) key.channel();

                //you must represent data you want to write to this socket
                //maybe attached to selection key !
                ByteBuffer dataToWrite = (ByteBuffer) key.attachment();
                //key.attachment here to help u have some meta data about each socket
                //use it smart !

                int write = socketChannel.write(dataToWrite);

                if(write<0)
                {
                    //so means some error occurs better to close it !
                    socketChannel.close();
                    System.out.println("CONNECTION CLOSED !"); //log
                    continue;//as socket closed we will skip next operations !
                }else if(!dataToWrite.hasRemaining())
                {
                    //so all data putted to buffer !
                    key.interestOps(SelectionKey.OP_READ); // just need to read !
                }
            }

            if(key.isValid() && key.isAcceptable())
            {
                ServerSocketChannel server =
                        (ServerSocketChannel) key.channel();//just server channels has accept operation

                SocketChannel socketChannel = server.accept(); //accept it !

                socketChannel.configureBlocking(false); // config none-blocking mode

                socketChannel.register(selector , SelectionKey.OP_READ);

                //also you can register for multiple operation using | operation
                //for example register for both read and write SelectionKey.READ|SelectionKey.WRITE
                //also you can change is late using key.interestOps(int ops)


                System.out.println("NEW CONNECTION"); //log
            }

            //there is another type of key,  key.isConnectable()
            //google it !




        }
    }
}
}

и вот BlockingClient. java:

import java.net.InetSocketAddress;
import java.net.Socket;

public class BlockingClient {

//using blocking sockets !
public static void main(String[] args)throws Exception
{
    Socket socket = new Socket();
    socket.connect(new InetSocketAddress("localhost" , 5050));
    socket.getOutputStream()
            .write("Hello Server".getBytes("UTF-8"));
    byte[] buffer = new byte[100];
    int len  = socket.getInputStream().read(buffer);
    System.out.println(new String(buffer , 0 , len , "UTF-8"));
    socket.close();
}
}

в этом примере мы отправляем сообщение Hello Server от клиента блокировки на сервер без блокировки, и сервер отвечает на сообщение Hello Client!

просто запустите!

Удачи

1 голос
/ 15 марта 2020

Ваша концепция неверна. Не следует запускать новый поток каждые несколько миллисекунд, потому что это сильно замедлит работу вашей системы. Также вы не можете открыть 1 миллион соединений одновременно. Ни одна нормальная операционная система не допустит этого.

Вместо этого обычные веб-серверы запускают максимальное количество потоков (например, 100 на среднем сервере), которые последовательно обрабатывают входящие запросы.

0 голосов
/ 15 марта 2020

Заманчивая мысль: «Не используйте java». Java имеет ужасную поддержку зеленых потоков, но golang и erlang построены на зеленых потоках, поэтому они делают это очень хорошо.

Путь java кажется рабочим пулам. Таким образом, вы создаете пул Executor (см. java .util.concurrent), решаете, сколько рабочих вы хотите для данного числа соединений, а затем передаете соединения рабочим через очередь. Затем рабочий должен выполнить итерацию по своему набору соединений, решив обработать каждое из них или выдать результат.

Количество ваших активных tcp-соединений жестко ограничено примерно 2 ^ 16 (65_536: количество доступных портов), но система вряд ли будет эффективным, если у вас так много подключений. Большинство систем не могут поддерживать производительность для более чем 200 постоянных соединений. Если вы можете значительно превысить это число, я бы предположил, что ваша система не работает достаточно для каждого соединения, чтобы действительно оправдать использование веб-сокетов, но я только догадываюсь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...