Java Socket Programming: серверный цикл выполняется бесконечно - PullRequest
0 голосов
/ 03 мая 2018

Хотя я понимаю, что

while(true){
}

генерирует бесконечный цикл, насколько я понимаю,

while(true){
blockingCall()
}

позволяет выполнить этот цикл x количество раз (x может быть между 0 и числом, которое достигает пределов ресурса данной машины) из-за природы блокирующих вызовов, т.е. если есть 3 вызова, сделанные для blockingCall () метод и третий вызов никогда не возвращаются, это означает, что программа должна ждать там. Это тема реализации, которая работает не так, как я ожидаю. Я реализую программу клиент / сервер с использованием Java Sockets. https://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html - это ссылочная ссылка, чтобы понять, что делает мой клиент (он просто запрашивает соединение с сервером, работающим на определенном порте, и отправляет сообщение. Сервер отменяет это сообщение и отправляет обратно клиенту). Я пытаюсь реализовать сервер таким образом, чтобы были ограничения на количество соединений, которые этот сервер позволяет. Если число клиентов, запрашивающих соединение, выходит за этот предел, то дополнительные запросы помещаются в очередь до максимального ограничения. Как только этот максимальный предел превышен, сервер просто записывает в журнал сообщение о том, что «больше никаких соединений не будет принято». Ниже моя серверная программа:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.*;

public class MultithreadedServer {
    private static BlockingQueue<Socket> queuedSockets = new ArrayBlockingQueue<>(1);                  //max queued connections.
    private static Semaphore semaphoreForMaxConnectionsAllowed = new Semaphore(2);              //max active connections being served.

    private static void handleClientConnectionRequest(final Socket newSocketForClientConnection, final Semaphore maxConnectionSemaphore) {
        new Thread(new Runnable() {

            @Override
            public void run() {

                try (
                        BufferedReader socketReader = new BufferedReader(new InputStreamReader(newSocketForClientConnection.getInputStream()));
                        PrintWriter socketWriter = new PrintWriter(newSocketForClientConnection.getOutputStream(), true)
                ) {

                    maxConnectionSemaphore.acquire();

                    String serverMsg;
                    String clientMsg;

                    SocketAddress clientSocket = (InetSocketAddress) newSocketForClientConnection.getRemoteSocketAddress();

                    while ((clientMsg = socketReader.readLine()) != null) {
                        if (clientMsg.equalsIgnoreCase("quit")) {
                            maxConnectionSemaphore.release();
                            break;
                        }

                        System.out.println("client with socket " + clientSocket + " sent MSG : " + clientMsg);
                        serverMsg = reverseString(clientMsg);

                        socketWriter.println(serverMsg);
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("Closing client upon client's request.");
                }
            }
        }).start();
    }

    private static String reverseString(String clientMsg) {
        synchronized (clientMsg) {
            StringBuffer stringBuffer = new StringBuffer();

            for (int i = clientMsg.length() - 1; i >= 0; i--) {
                stringBuffer.append(clientMsg.charAt(i));
            }

            return stringBuffer.toString();
        }
    }

    public static void main(String[] args) throws IOException {
        boolean shouldContinue = true;

        if (args.length != 1) {
            System.out.println("Incorrect number of arguments at command line");
            System.exit(1);
        }

        ServerSocket serverSocket = null;

        try {
            Integer portNumber = Integer.parseInt(args[0]);
            serverSocket = new ServerSocket(portNumber);
            int connectionNumber = 0;

            System.out.println("Server listening on port# : " + args[0]);

            //main thread...
            while (shouldContinue) {
                Socket newServerSocketForClientConnection = null;
                newServerSocketForClientConnection = queuedSockets.poll();

                if (newServerSocketForClientConnection == null) {
                    newServerSocketForClientConnection = serverSocket.accept();

                    connectionNumber++;
                    System.out.println("Created new socket upon client request. ConnectionCOunt = " + connectionNumber);

                    processConnection(newServerSocketForClientConnection);
                } else {
                    //i.e. queue has a socket request pending.
                    System.out.println("Picking queued socket..");
                    processConnection(newServerSocketForClientConnection);
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                serverSocket.close();
            }
        }
    }

    private static void processConnection(Socket newServerSocketForClientConnection) {

        if (semaphoreForMaxConnectionsAllowed.availablePermits() > 0) {
            handleClientConnectionRequest(newServerSocketForClientConnection, semaphoreForMaxConnectionsAllowed);
        } else {
            //System.out.println("Since exceeded max connection limit, adding in queue.");
            if (queuedSockets.offer(newServerSocketForClientConnection)) {
                System.out.println("connectionRequest queued because no more space on server. QueuedSocketList size : " + queuedSockets.size());
            }else{
                System.out.println("No space available for client connections. Can not be queued too.");
            }

        }

    }
}

Вывод, наблюдаемый через этот сервер, когда количество клиентских запросов выходит за пределы семафора (по какой-то причине я должен использовать семафор в своей программе и не могу использовать ExecutorService с FixedThreadPool):

enter image description here

Мой вопрос таков: похоже, что queuedSockets.poll () не удаляет элемент из blockingQueue. Вот почему я получаю этот псевдо-бесконечный цикл. Любая подсказка, почему это происходит? Я проверил документацию по blockingQueue, и в документе говорится, что poll () будет «Извлекать и удалять заголовок этой очереди», но, похоже, что это не происходит для вышеуказанной программы.

1 Ответ

0 голосов
/ 03 мая 2018

Давайте пройдем этот цикл:

//main thread...
        while (shouldContinue) {
            Socket newServerSocketForClientConnection = null;
            // poll for a pending connection in the queue
            newServerSocketForClientConnection = queuedSockets.poll();

            // if a pending connection exists, go to else...
            if (newServerSocketForClientConnection == null) {
                ...
            } else {
                // queue has a socket request pending, so we process the request...
                System.out.println("Picking queued socket..");
                processConnection(newServerSocketForClientConnection);
            }
        }

А потом в processConnection():

    // if there are no permits available, go to else...
    if (semaphoreForMaxConnectionsAllowed.availablePermits() > 0) {
        handleClientConnectionRequest(newServerSocketForClientConnection, semaphoreForMaxConnectionsAllowed);
    } else {
        // BlockingQueue.offer() puts this connection immediately back into the queue,
        // then the method exits
        if (queuedSockets.offer(newServerSocketForClientConnection)) {
            System.out.println("connectionRequest queued because no more space on server. QueuedSocketList size : " + queuedSockets.size());
        }else{
            System.out.println("No space available for client connections. Can not be queued too.");
        }

    }

После этого на следующей итерации цикла:

//main thread...
        while (shouldContinue) {
            Socket newServerSocketForClientConnection = null;
            // poll immediately gets the same request that was 
            // removed in the previous iteration
            newServerSocketForClientConnection = queuedSockets.poll();

            // Once something is in the queue, this condition will
            // never be met, so no new incoming connections
            // can be accepted
            if (newServerSocketForClientConnection == null) {
                ...
            } else {
                // process the same request again, forever, or until
                // a connection is freed up. Meanwhile, all other
                // incoming requests are being ignored.
                System.out.println("Picking queued socket..");
                processConnection(newServerSocketForClientConnection);
            }
        }

Таким образом, это не значит, что запрос никогда не удаляется из очереди, он просто немедленно возвращается обратно из-за того, что он был заблокирован семафором.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...