Хотя я понимаю, что
while(true){
}
генерирует бесконечный цикл, насколько я понимаю,
while(true){
blockingCall()
}
позволяет выполнить этот цикл x
количество раз (x может быть между 0 и числом, которое достигает пределов ресурса данной машины) из-за природы блокирующих вызовов, т.е. если есть 3 вызова, сделанные для blockingCall () метод и третий вызов никогда не возвращаются, это означает, что программа должна ждать там. Это тема реализации, которая работает не так, как я ожидаю.
Я реализую программу клиент / сервер с использованием Java Sockets. https://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html - это ссылочная ссылка, чтобы понять, что делает мой клиент (он просто запрашивает соединение с сервером, работающим на определенном порте, и отправляет сообщение. Сервер отменяет это сообщение и отправляет обратно клиенту). Я пытаюсь реализовать сервер таким образом, чтобы были ограничения на количество соединений, которые этот сервер позволяет. Если число клиентов, запрашивающих соединение, выходит за этот предел, то дополнительные запросы помещаются в очередь до максимального ограничения. Как только этот максимальный предел превышен, сервер просто записывает в журнал сообщение о том, что «больше никаких соединений не будет принято». Ниже моя серверная программа:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.*;
public class MultithreadedServer {
private static BlockingQueue<Socket> queuedSockets = new ArrayBlockingQueue<>(1); //max queued connections.
private static Semaphore semaphoreForMaxConnectionsAllowed = new Semaphore(2); //max active connections being served.
private static void handleClientConnectionRequest(final Socket newSocketForClientConnection, final Semaphore maxConnectionSemaphore) {
new Thread(new Runnable() {
@Override
public void run() {
try (
BufferedReader socketReader = new BufferedReader(new InputStreamReader(newSocketForClientConnection.getInputStream()));
PrintWriter socketWriter = new PrintWriter(newSocketForClientConnection.getOutputStream(), true)
) {
maxConnectionSemaphore.acquire();
String serverMsg;
String clientMsg;
SocketAddress clientSocket = (InetSocketAddress) newSocketForClientConnection.getRemoteSocketAddress();
while ((clientMsg = socketReader.readLine()) != null) {
if (clientMsg.equalsIgnoreCase("quit")) {
maxConnectionSemaphore.release();
break;
}
System.out.println("client with socket " + clientSocket + " sent MSG : " + clientMsg);
serverMsg = reverseString(clientMsg);
socketWriter.println(serverMsg);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Closing client upon client's request.");
}
}
}).start();
}
private static String reverseString(String clientMsg) {
synchronized (clientMsg) {
StringBuffer stringBuffer = new StringBuffer();
for (int i = clientMsg.length() - 1; i >= 0; i--) {
stringBuffer.append(clientMsg.charAt(i));
}
return stringBuffer.toString();
}
}
public static void main(String[] args) throws IOException {
boolean shouldContinue = true;
if (args.length != 1) {
System.out.println("Incorrect number of arguments at command line");
System.exit(1);
}
ServerSocket serverSocket = null;
try {
Integer portNumber = Integer.parseInt(args[0]);
serverSocket = new ServerSocket(portNumber);
int connectionNumber = 0;
System.out.println("Server listening on port# : " + args[0]);
//main thread...
while (shouldContinue) {
Socket newServerSocketForClientConnection = null;
newServerSocketForClientConnection = queuedSockets.poll();
if (newServerSocketForClientConnection == null) {
newServerSocketForClientConnection = serverSocket.accept();
connectionNumber++;
System.out.println("Created new socket upon client request. ConnectionCOunt = " + connectionNumber);
processConnection(newServerSocketForClientConnection);
} else {
//i.e. queue has a socket request pending.
System.out.println("Picking queued socket..");
processConnection(newServerSocketForClientConnection);
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}
private static void processConnection(Socket newServerSocketForClientConnection) {
if (semaphoreForMaxConnectionsAllowed.availablePermits() > 0) {
handleClientConnectionRequest(newServerSocketForClientConnection, semaphoreForMaxConnectionsAllowed);
} else {
//System.out.println("Since exceeded max connection limit, adding in queue.");
if (queuedSockets.offer(newServerSocketForClientConnection)) {
System.out.println("connectionRequest queued because no more space on server. QueuedSocketList size : " + queuedSockets.size());
}else{
System.out.println("No space available for client connections. Can not be queued too.");
}
}
}
}
Вывод, наблюдаемый через этот сервер, когда количество клиентских запросов выходит за пределы семафора (по какой-то причине я должен использовать семафор в своей программе и не могу использовать ExecutorService с FixedThreadPool):
Мой вопрос таков: похоже, что queuedSockets.poll () не удаляет элемент из blockingQueue. Вот почему я получаю этот псевдо-бесконечный цикл. Любая подсказка, почему это происходит? Я проверил документацию по blockingQueue, и в документе говорится, что poll () будет «Извлекать и удалять заголовок этой очереди», но, похоже, что это не происходит для вышеуказанной программы.