Лучший способ справиться со многими клиентами (с потоками?) - PullRequest
0 голосов
/ 06 апреля 2020

Итак, мой вопрос здесь. Теперь, если на моем Сервере более 20 клиентов, он также имеет 20 потоков, а мой рабочий стол с процессором ryzen переходит на 100% при использовании в 30 потоков. Теперь я хотел бы обрабатывать большое количество клиентов одним сервером, но процессор просто перегружен. Мое мудрое очень просто, как я это делаю, но должен быть лучший путь; потому что я пока видел много хороших java серверов. Я не знаю, что я делаю неправильно, хотя. Далее я поделюсь своим кодом, как я это делаю в принципе.

while(this.isRunning()) {
ServerSocket server = new ServerSocket(8081);
Socket s = server.accept();
new Thread(new WorkerRunnable(s)).start();
//now here if e.g. over 25 users connect there are 25 threads. CPU is at 100%. Is there a better way to handle this?

Рабочий исполняемый файл идентифицирует клиентов. После этого они попадут в чат. Это как групповой чат, например,

Редактировать: соответствующие части моего очень незавершенного кода, который все еще очень WIP

private boolean state;
private ServerSocket socket;

@Override
public void run() {
    while(this.isRunning()==true) {
        try {
            if(this.socket==null) this.socket = new ServerSocket(this.getPort());
            Socket connection = this.socket.accept();




            IntroductionSession session = new IntroductionSession(this, connection);
            new Thread(session).start();
            //register timeout task for 3 secs and handle it async



            System.out.println(ManagementFactory.getThreadMXBean().getThreadCount());
            //this.handleIncomingConnection(connection);
        } catch(Exception e) {
            e.printStackTrace();
            //System.exit(1);
        }
    }
}

закрытый класс ВведениеSession реализует Runnable {private boolean alive = true;

    private BaseServer server;
    private Socket socket;
    private boolean introduced = false;

    public IntroductionSession(BaseServer server, Socket socket) {
        this.server = server;
        this.socket = socket;
    }

    private void interrupt() {
        System.out.println("Not mroe alive");
        this.alive = false;
    }

    private void killConnection() {
        this.killConnection("no_reason");
    }

    private void killConnection(String reason) {
        try {
            if(this.from_client!=null) this.from_client.close();
            if(this.to_client!=null) this.to_client.close();
            this.socket.close();

            switch(reason) {
                case "didnt_introduce":
                    System.out.println("Kicked connection, cause it didn't introduce itself");
                break;
                case "unknown_type":
                    System.out.println("Kicked unknown connection-type.");
                break;
                case "no_reason":
                default:
                    //ignore
                break;
            }
        } catch (IOException e) {
            switch(reason) {
                case "didnt_introduce":
                    System.out.println("Error at kicking connection, which didn't introduce itself");
                break;
                case "unknown_type":
                    System.out.println("Error at kicking unknown connection-type.");
                break;
                case "no_reason":
                default:
                    System.out.println("Error occured at kicking connection");
                break;
            }

            e.printStackTrace();

        }
    }

    private ObjectInputStream from_client;
    private ObjectOutputStream to_client;

    @Override
    public void run() {
        while(this.alive==true) {
            try {
                if(this.to_client==null) {
                    this.to_client = new ObjectOutputStream(this.socket.getOutputStream());
                    //this.to_client.flush();
                }
                if(this.from_client==null) this.from_client = new ObjectInputStream(this.socket.getInputStream());
                //Time runs now, if socket is inactive its getting kicked
                new Timer().schedule(new java.util.TimerTask() {
                        @Override
                        public void run() {
                            if(IntroductionSession.this.introduced==false) {
                                IntroductionSession.this.killConnection("didnt_introduce");
                                Thread.currentThread().interrupt();
                                IntroductionSession.this.interrupt();
                            }
                        }
                    }, 5000
                );

                Object obj = this.from_client.readObject();
                while(obj!=null) {
                    if(obj instanceof IntroductionPacket) {
                        IntroductionPacket pk = (IntroductionPacket) obj;
                        introduced = true;

                        if(isCompatible(pk)==false) {
                            try {
                                this.to_client.writeObject(new DifferentVersionKickPacket(BaseServer.version));
                                this.to_client.close();
                                this.from_client.close();
                                IntroductionSession.this.socket.close();
                                System.out.println("Kicked socket, which uses another version.");
                            } catch(Exception e) {
                                Thread.currentThread().interrupt();
                                //ignore
                                System.out.println("Error at kicking incompatible socket.");
                                e.printStackTrace();
                            }
                        } else {
                            this.server.handleIncomingConnection(this.socket, this.from_client, this.to_client);
                        }

                        Thread.currentThread().interrupt();
                    }
                }
            } catch(StreamCorruptedException e) {
                //unknown client-type = kick
                this.killConnection("unknown_type");
            } catch (IOException|ClassNotFoundException e) {
                e.printStackTrace();
                this.killConnection("no_reason");
            }/* catch(SocketException e) {

            }*/
        }
        Thread.currentThread().interrupt();
    }
}

Расширяемый класс, который является реальным сервером:

@Override
public void handleIncomingConnection(Socket connection, ObjectInputStream from_client, ObjectOutputStream to_client) {
    new AuthenticationSession(connection, from_client, to_client).run();
}

private class AuthenticationSession implements Runnable {
    private Socket socket;
    private ObjectInputStream from_client;
    private ObjectOutputStream to_client;

    public AuthenticationSession(Socket socket, ObjectInputStream from_client, ObjectOutputStream to_client) {
        this.socket = socket;
        this.to_client = to_client;
        this.from_client = from_client;
    }
    //TODO: Implement app id for access tokens
    @Override
    public void run() {
        try {
            while(this.socket.isConnected()==true) {
                /*ObjectOutputStream to_client = new ObjectOutputStream(socket.getOutputStream()); //maybe cause problems, do it later if it does
                ObjectInputStream from_client = new ObjectInputStream(socket.getInputStream());*/

                Object object = from_client.readObject();

                while(object!=null) {
                    if(object instanceof RegisterPacket) {
                        RegisterPacket regPacket = (RegisterPacket) object;

                        System.out.println("Username:" + regPacket + ", password: " + regPacket.password + ", APP-ID: " + regPacket.appId);
                    } else {
                        System.out.println("IP " + this.socket.getInetAddress().getHostAddress() + ":" + this.socket.getPort() + " tried to send an unknown packet.");
                        this.socket.close();
                    }
                }
            }
        }/* catch(EOFException eofe) {
            //unexpected disconnect

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }*/
        catch(Exception e) {
            e.printStackTrace();
            System.exit(1);
        }

        /*catch(Exception e) {
            //e.printStackTrace();

            Thread.currentThread().interrupt();
        }*/
    }

}

Пожалуйста, не смотрите на его очень плохое форматирование и вещи, которые я делал в надежде исправить, задачи не выполняются ie как бы то ни было.

Ответы [ 2 ]

0 голосов
/ 06 апреля 2020

Как правило, в коде сервера производственного уровня мы не работаем с прямым созданием сокета и обработкой запросов. Работать с розетками низкого уровня, закрывать соединения и предотвращать утечки - это кошмар. Скорее, мы полагаемся на платформы производственного класса, такие как Java Spring Framework или Play Framework .

Мой вопрос: почему вы не используете какие-либо серверные фреймворки, такие как перечисленные выше?

  1. Если вам интересно, как эти фреймворки обрабатывают тысячи одновременных запросов, изучите шаблоны проектирования, такие как Thread Pool . Эти структуры отвлекают вас от сложностей и обрабатывают пул потоков для вас.

  2. Если клиенты не ожидают немедленного ответа, вы также можете изучить введение очереди сообщений такие как Кафка . Сервер выберет сообщения одно за другим из очереди и обработает их. Однако имейте в виду, что это асинхронно и может не соответствовать вашим требованиям.

  3. Если вы не ограничены одним сервером, вы можете развернуть код своего сервера на Azure или AWS VMSS (установлен масштаб виртуальной машины). На основе настроенных вами правил загрузки ЦП система будет автоматически масштабировать и динамически управлять ресурсами для вас.

Я бы предложил прочитать принципы проектирования системы, относящиеся к серверам, чтобы укрепить ваше понимание.

Не изобретайте велосипед.

0 голосов
/ 06 апреля 2020

Поскольку вы работаете с приложением чата, вам нужно подумать о проведении однопоточного события L oop.

Вы можете сохранить карту строк (идентификатор клиента) и сокета (сокет клиента).

Map<String, Socket> clientSockets;

Поток вашего сервера примет новые сокеты клиента и просто поместит его на карту выше. , Затем будет другой поток, который будет выполнять событие L oop, и всякий раз, когда в любом из клиентских сокетов в InputStream есть данные, он должен отправлять эти данные всем другим клиентским сокетам (групповой чат). Это должно происходить бесконечно с интервалом сна.

...