Здесь у меня есть сервер, и клиентская архитектура сервера будет отправлять сообщения в бесконечном цикле. Если на стороне клиента сначала находится поток, который установит соединение с сервером, а также создаст два разных потока, то первый поток должен прочитать данные с сервера и вставить в очередь, второй поток должен прочитать данные из очередь.
Я уже пытался вызвать второй поток внутри первого потока, чтобы подтвердить, что функция работает полностью. Но это не то, что я хочу.
Я также пытался разорвать цикл и снова вызвать метод вставки в рекурсии, который также не работал для меня.
Вот три класса, которые я использую.
Server.class
package threadPool.poc;
import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(8080);
Socket s = ss.accept();
OutputStream outputStream = s.getOutputStream();
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputStream);
BufferedWriter bufferedWriter = new BufferedWriter(outputStreamWriter);
System.out.println("Server Started");
int i = 1;
while (true) {
System.out.println("Writing :String" + i);
bufferedWriter.write("String " + i + "\n");
bufferedWriter.flush();
Thread.sleep(1000);
i++;
}
} catch (Exception e) {
System.out.println(e);
} finally {
}
}
}
QueueImplementation.class
package threadPool.poc;
import java.io.UnsupportedEncodingException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class QueueImplementation {
public Queue<byte[]> threadQueue = new ConcurrentLinkedQueue<>();
public void insertInQueue(byte[] value) {
boolean isInserted = threadQueue.add(value);
if (isInserted) {
System.out.println("Inserted in a queue");
} else {
System.out.println("Cannot insert");
}
}
public String retriveFromQueue() {
try {
byte[] data = threadQueue.poll();
if (data == null) {
System.out.println("Doesn't have any data for threadNumber ");
return null;
}
System.out.println("Data in the form of byte array for threadNumber :" + data);
String s = new String(data, "UTF-8");
System.out.println(s);
return s;
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
}
Client.class
package threadPool.poc;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
public class Client implements Runnable {
public Client() {
super();
}
static QueueImplementation queueImplementation = new QueueImplementation();
private String threadType;
private String message;
private int i;
private static BufferedReader bufferedReader = null;
public static void main(String[] args) {
System.out.println("Starting");
Client client = new Client("connect");
client.run();
}
public Client(String threadType) {
super();
this.threadType = threadType;
}
@Override
public void run() {
try {
switch (threadType) {
case ("connect"):
connectServer();
break;
case ("insertPool"):
insertIntoQueue();
break;
case ("ReadPool"):
readMessage();
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void insertIntoQueue() throws InterruptedException, IOException {
int i = 1;
BufferedReader reader = bufferedReader;
System.out.println("IS buffered reader empty : " + reader);
while (reader != null) {
String message = reader.readLine();
System.out.println("Reading message from server : " + message);
if (message != null) {
queueImplementation.insertInQueue(message.getBytes());
System.out.println("Inserted in a Queue");
} else {
System.out.println("Null message read from server ");
}
i++;
}
}
private void connectServer() {
try {
Socket s = new Socket("localhost", 8080);
InputStreamReader inputStreamReader = new InputStreamReader(s.getInputStream());
bufferedReader = new BufferedReader(inputStreamReader);
System.out.println("Client started");
Runnable client = new Client("insertPool");
Runnable client2 = new Client("ReadPool", i);
/*
* ExecutorService executorService = Executors.newCachedThreadPool();
* executorService.execute(client2); executorService.execute(client);
*/
client2.run();
client.run();
} catch (Exception e) {
System.out.println(e);
} finally {
System.out.println("Entered in finally");
}
}
public Client(String threadType, String message) {
super();
this.threadType = threadType;
this.message = message;
}
public Client(String threadType, int i) {
super();
this.threadType = threadType;
this.i = i;
}
private void readMessage() throws InterruptedException {
while (true) {
System.out.println("reading data from queue for thread : " + i);
queueImplementation.retriveFromQueue();
Thread.sleep(1000);
}
}
}
Выходные данные показывают поток клиента 1 или клиента 2, застрявший в цикле. Это не вызывает другой поток, и он продолжает ждать.
Это с консоли сервера.
Server Started
Writing :String1
Writing :String2
Writing :String3
Writing :String4
Writing :String5
Writing :String6
Writing :String7
Writing :String8
Writing :String9
Writing :String10
Writing :String11
Writing :String12
Writing :String13
Это из клиентской консоли:
Starting
Client started
reading data from queue for thread : 0
Doesn't have any data for threadNumber
reading data from queue for thread : 0
Doesn't have any data for threadNumber
reading data from queue for thread : 0
Doesn't have any data for threadNumber
reading data from queue for thread : 0
Doesn't have any data for threadNumber
reading data from queue for thread : 0
Doesn't have any data for threadNumber
reading data from queue for thread : 0
См. Клиентский результат, который он продолжает работать, пока я не остановлюсь. Проблема здесь в том, что поток вставки не вызывает.
Мой ожидаемый результат должен быть таким:
должен быть также поток чтения из очереди / вызова потока, а также между методом / потоком вставки.