Я новичок в Java, пытаюсь изучить сетевое программирование и параллелизм, и я решил попробовать написать простой чат-сервер, в котором входные данные от клиента отражаются для всех клиентов.Этого не происходитЯ добавил пару операторов печати, чтобы программа объявляла, что она ожидает подключения и каждый раз получает соединение.Я использую Telnet локально для подключения к порту на моей машине.
Программа сообщает об успехе для первого и второго одновременных подключений, но затем не объявляет об успехе для последующих подключений , пока я не закрою все подключения. Так, например, я подключусь с пяти отдельных терминалов, и программа объявит «Соединение 1» и «Соединение 2», но не объявит «Соединение 3», 4 и 5, пока я не закрою все терминалы.
Мне нужна помощь, чтобы выяснить, где лежат мои ошибки, а также общие советы о том, как подходить к отладке подобной ситуации.
В двух словах, моя программа имеет
- Основной класс, который запускает другие три потока
- Класс ClientListener, который использует SocketReader для прослушивания соединений и сохраняет входные потоки Sockets и выходные потоки в двух наборах.
- MessageReader, который перебирает входные потоки.Если он находит сообщение, он помещает его в SynchronousQueue и ждет, пока
- MessageWriter его удалит.MessageWriter отправляет сообщение всем выходным потокам.
Код ниже.Спасибо за любую помощь!
public class Main {
public static void main(String[] args) {
ClientListener clientListener = new ClientListener();
Thread clientListenerThread = new Thread(clientListener);
clientListenerThread.setPriority(Thread.MAX_PRIORITY);
clientListenerThread.start();
MessageReader messageReader = new MessageReader(clientListener);
Thread messageReaderThread = new Thread(messageReader);
messageReaderThread.setPriority(Thread.MIN_PRIORITY);
messageReaderThread.start();
MessageWriter messageWriter = new MessageWriter(messageReader, clientListener);
Thread messageWriterThread = new Thread(messageWriter);
messageWriterThread.setPriority(Thread.NORM_PRIORITY);
messageWriterThread.start();
}
}
public class ClientListener implements Runnable {
private static final int DEFAULT_PORT = 5000;
private Set<Scanner> clientIn = Collections.synchronizedSet(
new LinkedHashSet<Scanner>());
private Set<PrintWriter> clientOut = Collections.synchronizedSet(
new LinkedHashSet<PrintWriter>());
public Set<Scanner> getClientIn() {
return clientIn;
}
public Set<PrintWriter> getClientOut() {
return clientOut;
}
@Override
public void run() {
try {
ServerSocket server = new ServerSocket(DEFAULT_PORT);
System.out.println("Listening for connections...");
int connectionNum = 0;
while(true) {
Socket socket = server.accept();
connectionNum++;
System.out.format("Connection %s%n", connectionNum);
Scanner in = new Scanner(socket.getInputStream());
PrintWriter out = new PrintWriter(socket.getOutputStream());
clientIn.add(in);
clientOut.add(out);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class MessageReader implements Runnable {
private ClientListener clientListener;
private BlockingQueue<String> messages = new SynchronousQueue<String>();
public MessageReader(ClientListener clientListener) {
this.clientListener = clientListener;
}
@Override
public void run() {
while(true) {
Set<Scanner> clients = clientListener.getClientIn();
synchronized (clients) {
for(Scanner client: clients) {
if(client.hasNext()) {
try {
messages.put(client.next());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
public String getMessage() throws InterruptedException {
return messages.take();
}
}
public class MessageWriter implements Runnable {
private ClientListener clientListener;
private MessageReader messageReader;
public MessageWriter(
MessageReader messageReader,
ClientListener clientListener) {
this.messageReader = messageReader;
this.clientListener = clientListener;
}
@Override
public void run() {
try {
while(true) {
String message = messageReader.getMessage();
Set<PrintWriter> clients = clientListener.getClientOut();
synchronized (clients) {
for(PrintWriter client: clients) {
client.println(message);
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}