Вызов нескольких отдельных потоков асинхронно. Но один поток застрял в цикле, а другой поток не выполняется - PullRequest
0 голосов
/ 12 июня 2019

Здесь у меня есть сервер, и клиентская архитектура сервера будет отправлять сообщения в бесконечном цикле. Если на стороне клиента сначала находится поток, который установит соединение с сервером, а также создаст два разных потока, то первый поток должен прочитать данные с сервера и вставить в очередь, второй поток должен прочитать данные из очередь.

Я уже пытался вызвать второй поток внутри первого потока, чтобы подтвердить, что функция работает полностью. Но это не то, что я хочу. Я также пытался разорвать цикл и снова вызвать метод вставки в рекурсии, который также не работал для меня.

Вот три класса, которые я использую.

 Server.class
package threadPool.poc;

import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {

    public static void main(String[] args) {
        try {
            ServerSocket ss = new ServerSocket(8080);
            Socket s = ss.accept();
            OutputStream outputStream = s.getOutputStream();
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputStream);
            BufferedWriter bufferedWriter = new BufferedWriter(outputStreamWriter);
            System.out.println("Server Started");
            int i = 1;
            while (true) {
                System.out.println("Writing :String" + i);
                bufferedWriter.write("String " + i + "\n");
                bufferedWriter.flush();
                Thread.sleep(1000);
                i++;
            }

        } catch (Exception e) {
            System.out.println(e);
        } finally {

        }
    }

}

QueueImplementation.class

package threadPool.poc;

import java.io.UnsupportedEncodingException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class QueueImplementation {

    public Queue<byte[]> threadQueue = new ConcurrentLinkedQueue<>();

    public void insertInQueue(byte[] value) {
        boolean isInserted = threadQueue.add(value);
        if (isInserted) {
            System.out.println("Inserted in a queue");
        } else {
            System.out.println("Cannot insert");
        }

    }

    public String retriveFromQueue() {
        try {
            byte[] data = threadQueue.poll();
            if (data == null) {
                System.out.println("Doesn't have any data for threadNumber ");
                return null;
            }
            System.out.println("Data in the form of byte array for threadNumber :" + data);
            String s = new String(data, "UTF-8");
            System.out.println(s);
            return s;
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return null;
    }

}

Client.class

package threadPool.poc;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class Client implements Runnable {

    public Client() {
        super();
    }

    static QueueImplementation queueImplementation = new QueueImplementation();

    private String threadType;

    private String message;

    private int i;

    private static BufferedReader bufferedReader = null;

    public static void main(String[] args) {

        System.out.println("Starting");
        Client client = new Client("connect");
        client.run();
    }

    public Client(String threadType) {
        super();
        this.threadType = threadType;
    }

    @Override
    public void run() {
        try {
            switch (threadType) {
            case ("connect"):
                connectServer();
                break;
            case ("insertPool"):
                insertIntoQueue();
                break;
            case ("ReadPool"):
                readMessage();
                break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void insertIntoQueue() throws InterruptedException, IOException {
        int i = 1;
        BufferedReader reader = bufferedReader;
        System.out.println("IS buffered reader empty : " + reader);
        while (reader != null) {
            String message = reader.readLine();
            System.out.println("Reading message from server : " + message);
            if (message != null) {
                queueImplementation.insertInQueue(message.getBytes());
                System.out.println("Inserted in a Queue");
            } else {
                System.out.println("Null message read from server ");
            }
            i++;
        }

    }

    private void connectServer() {
        try {
            Socket s = new Socket("localhost", 8080);
            InputStreamReader inputStreamReader = new InputStreamReader(s.getInputStream());
            bufferedReader = new BufferedReader(inputStreamReader);
            System.out.println("Client started");
            Runnable client = new Client("insertPool");
            Runnable client2 = new Client("ReadPool", i);
            /*
             * ExecutorService executorService = Executors.newCachedThreadPool();
             * executorService.execute(client2); executorService.execute(client);
             */
            client2.run();
            client.run();



        } catch (Exception e) {
            System.out.println(e);
        } finally {
            System.out.println("Entered in finally");
        }
    }


    public Client(String threadType, String message) {
        super();
        this.threadType = threadType;
        this.message = message;
    }

    public Client(String threadType, int i) {
        super();
        this.threadType = threadType;
        this.i = i;
    }

    private void readMessage() throws InterruptedException {

        while (true) {

            System.out.println("reading data from queue for thread : " + i);
            queueImplementation.retriveFromQueue();
            Thread.sleep(1000);
        }

    }
}

Выходные данные показывают поток клиента 1 или клиента 2, застрявший в цикле. Это не вызывает другой поток, и он продолжает ждать.

Это с консоли сервера.

Server Started
Writing :String1
Writing :String2
Writing :String3
Writing :String4
Writing :String5
Writing :String6
Writing :String7
Writing :String8
Writing :String9
Writing :String10
Writing :String11
Writing :String12
Writing :String13

Это из клиентской консоли:

Starting
Client started
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0

См. Клиентский результат, который он продолжает работать, пока я не остановлюсь. Проблема здесь в том, что поток вставки не вызывает.

Мой ожидаемый результат должен быть таким:

должен быть также поток чтения из очереди / вызова потока, а также между методом / потоком вставки.

Ответы [ 2 ]

1 голос
/ 12 июня 2019

Проблема в том, что ваш клиент просто работоспособен.Для выполнения своего кода требуется вспомогательный поток.

Когда вы вызываете

client2.run();

, текущий поток входит и выполняет этот исполняемый файл.Другой клиент ожидает завершения предыдущего, чего не будет, поскольку он находится в цикле.

Что вам нужно сделать, это обернуть этот запускаемый объект в новый поток и отправить его как

new Thread(client2).start();
0 голосов
/ 12 июня 2019

Вы не можете определить, какой поток должен делать то, что ... Но вы можете поместить один поток, который делает что-то, а остальные удерживают ... Проверьте синхронизированное ключевое слово

https://www.baeldung.com/java-synchronized

...