Сервер не прочитал сообщение от последнего клиента - PullRequest
0 голосов
/ 22 ноября 2018

Мне нужно смоделировать распределенную систему.

Есть контроллер и n рабочих компьютеров.

Контроллер сообщает компьютерам, когда запускать, и компьютеры начинают подключаться к другим компьютерам с помощью сокетов.Контроллер подключается к компьютерам с помощью потоков.Компьютер также будет подключаться к другим компьютерам с помощью потоков.

Как только они соединятся друг с другом, они будут отправлять события друг другу, пока они не сгенерируют x событий.Как только они достигают событий x , компьютер отправит контроллеру сообщение «Завершить» о том, что он завершил генерацию событий, но продолжит чтение событий с других компьютеров.

Моя проблема: Компьютеры успешно отправили сообщение Готово контроллеру, кроме последнего компьютера в системе.Согласно журналам, последний компьютер отправил сообщение Finish контроллеру, но контроллер не получил его.Другие компьютеры успешно отправили сообщение о завершении на контроллер.

Если вам нужна дополнительная информация, я был бы рад предоставить.Я работал над этим часами и понятия не имею.

Computer.java

package timetableexchange;

import java.io.IOException;
import java.io.BufferedReader;
import java.io.PrintWriter;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Random;
import java.util.Vector;

public class Computer {

    // Constant system capacity
    static final int MAX_SYSTEMS = 4;
    // Computer's time-stamp vector
    static Vector<Integer> timestamp = new Vector<Integer>();
    // Computer's ID
    static int identifier;
    // Computer's Event Count
    static int eventCount = 0;
    // Computer's isAlive check
    static boolean isAlive = true;

    // Socket to Controller
    Socket socketToController;
    PrintWriter outputToController;
    BufferedReader inputFromController;
    String textFromController;

    // Server Socket
    ServerSocket serverSocket;

    // Input and Output Clients
    static ArrayList<ClientSocket> outputClients = new ArrayList<ClientSocket>();
    static ArrayList<ClientConnection> inputClients = new ArrayList<ClientConnection>();

    // Log
    Log log;

    public static void main(String[] args) throws IOException {
        new Computer("127.0.0.1", 8000);
    }

    public Computer(String hostname, int port) throws IOException {
        // Initialize time-stamp
        for (int i = 0; i < MAX_SYSTEMS; ++i) {
            timestamp.add(0);
        }

        // Connect to Controller
        try {
            socketToController = new Socket(hostname, port);
            inputFromController = new BufferedReader(new InputStreamReader(socketToController.getInputStream()));
            outputToController = new PrintWriter(socketToController.getOutputStream(), true);
        } catch (UnknownHostException e1) {
            e1.printStackTrace();
        } catch (IOException e1) {
            e1.printStackTrace();
        }

        // Get Computer ID from Controller
        while (true) {
            try {
                if (inputFromController.ready()) {
                    textFromController = inputFromController.readLine();
                    identifier =  Integer.parseInt(textFromController);
                    break;
                }
            } catch (NumberFormatException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        log = new Log("client" + identifier + ".txt");

        // Read start message
        while (true) {
            try {
                if (inputFromController.ready()) {
                    textFromController = inputFromController.readLine();
                    if (textFromController.equals("Start")) {
                        log.write("Computer is starting!");
                        break;
                    } 
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        // Instantiate server socket
        int socketPort = port + identifier + 1;
        // System.out.println(socketPort);
        serverSocket = new ServerSocket(socketPort);

        log.write("Server Socket Instantiated");

        // Instantiate sockets for other server sockets (computers) to send
        for (int i = 0; i < MAX_SYSTEMS; ++i) {
            if (i != identifier) {
                Socket acceptedSocket = new Socket(hostname, port + i + 1);
                ClientSocket socketToComputer = new ClientSocket (acceptedSocket);
                outputClients.add(socketToComputer);
            }
        }

        log.write("Client Sockets Instantiated\n");

        // Accept sockets from server socket and add them into a list
        for (int i = 0; i < MAX_SYSTEMS - 1; ++i) {
            ClientConnection computerConn = new ClientConnection(serverSocket.accept());
            computerConn.start();
            inputClients.add(computerConn);
        }

        log.write("Server connected to clients");

        Random rand = new Random();

        // Generating events
        int temp;
        while (eventCount < 50) {
            log.write("Generating Event");
            int choice = rand.nextInt(5);
            if (choice == 0) {
                temp = timestamp.get(identifier);
                ++temp;
                timestamp.set(identifier, temp);
            } else {
                int randC = rand.nextInt(outputClients.size());
                ClientSocket cc = outputClients.get(randC);
                cc.out.writeObject(new Event(identifier, timestamp));
            }
            log.write(timestamp.toString());
            log.write("Done Generating Event");
            eventCount++;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        log.write("Finished writing. Continue reading...");

        /**
         * ========THE ISSUE IS BELOW.===============
         */
        synchronized (outputToController) {
            outputToController.println("Finish");
            outputToController.flush();
        }

        log.write("Sent Finish Message " + identifier);

        // Wait for Tear Down Message
        while (true) {
            try {
                if (inputFromController.ready()) {
                    textFromController = inputFromController.readLine();
                    if (textFromController.equals("Tear Down")) {
                        log.write("Tearing down....");
                        isAlive = false;
                        break;
                    } 
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        log.write("Computer shutting off....");

    }

    // client socket class (organizing)
    public class ClientSocket {
        Socket socket;
        ObjectOutputStream out;
        ObjectInputStream in;

        public ClientSocket(Socket s) {
            try {
                this.socket = s;
                this.out = new ObjectOutputStream(socket.getOutputStream());
            } catch (IOException e) {
                e.printStackTrace();
            }
            log.write("Client Socket Created");
        }
    }

    // send event thread
    public class ClientConnection extends Thread {
        Socket socket;
        ObjectOutputStream out;
        ObjectInputStream in;
        Random rand = new Random();
        public ClientConnection(Socket s) {
            this.socket = s;
            try {
                out = new ObjectOutputStream (socket.getOutputStream());
                in = new ObjectInputStream (socket.getInputStream());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run () {
            while (isAlive) {
                log.write("Reading events");
                try {
                    Event event = (Event) in.readObject();
                    executeEvent(event.getFromID(), event.getTimestamp());
                } catch (ClassNotFoundException e) {

                } catch (IOException e) {
                    e.printStackTrace();
                }
                System.out.println(timestamp);
            }
            log.write("Finished Reading");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // execute the event
        private void executeEvent(int from, Vector<Integer> x) {
            int temp;
            synchronized (timestamp) {
                for (int i = 0; i < timestamp.size(); ++i) {
                    if (x.get(i) > timestamp.get(i)) {
                        timestamp.set(i, x.get(i));
                    }
                }
                temp = timestamp.get(from);
                ++temp;
                timestamp.set(from, temp);
            }
        }
    }   
}

Controller.java

package timetableexchange;

import java.io.IOException;
import java.io.BufferedReader;
import java.io.PrintWriter;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;

public class Controller {

    // Mutex Lock
    public final static Object lock = new Object();

    // Constant system capacity
    static final int MAX_SYSTEMS = 4;

    // Server connection threads to computers
    static ArrayList<ServerConnection> conns = new ArrayList<ServerConnection>();

    // Finished computers
    static int finishedCount = 0;

    // Server Socket
    ServerSocket ss;

    // Log Instance
    Log log;

    public static void main(String[] args) throws IOException {
        new Controller(8000);
    }

    public Controller(int port) {
        // Instantiate Log
        log = new Log("server.txt");
        // Instantiate Server Socket and Listen for Incoming Sockets
        try {
            ss = new ServerSocket(port);
            log.write("Listening...");
            // Accept computers until capacity
            for (int i = 0; i < MAX_SYSTEMS; i++) {
                Socket s = ss.accept();
                log.write("Socket connected");
                // Add to list
                ServerConnection conn = new ServerConnection(i, s);
                conns.add(conn);
                conn.start();
            }
            // Notify all waiting threads to start
            synchronized (lock) {
                try {
                    Thread.sleep(1000); 
                    lock.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private class ServerConnection extends Thread {

        // Client Socket
        Socket socket;

        // Output stream
        PrintWriter out;

        // Input stream
        BufferedReader in;

        // ID for connected computer
        int identifier;

        public ServerConnection(int i, Socket s) {
            // Instantiate properties
            this.identifier = i;
            this.socket = s;
            try {
                this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                this.out = new PrintWriter(socket.getOutputStream(), true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            log.write("Controller is connected to computer#" + identifier);
            // Send ID to computer
            out.println(identifier);
            // Wait until notified
            synchronized (lock) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // Send Start Message to All Computers
            sendAll("Start");

            waitForFinish();

            log.write("Computer#" + identifier + " is waiting for tear down.");

            // If all computers sent the Finish message, send a Tear Down
            while (true) {
                if (finishedCount == conns.size()) {
                    log.write("Sending tear down to all computers");
                    sendAll("Tear Down");
                }

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        /**
         * ==== RECEIVE FINISH MESSAGE FROM COMPUTERS ======
         */
        private void waitForFinish() {
            String clientInput;
            while (true) {
                try {
                    if (in.ready()) {
                        clientInput = in.readLine();
                        log.write(clientInput);
                        if (clientInput.equals("Finish")) {
                            finishedCount += 1;
                            log.write("Computer " + identifier + " is finished");
                            break;
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        // Send all "text" to all computers in the thread pool
        private void sendAll(String text) {
            for (int i = 0; i < conns.size(); ++i) {
                ServerConnection conn = conns.get(i);
                conn.out.println(text);
            }
        }
    }

}

Журнал контроллера: (обратите внимание, чтоВ журнале не указано, что компьютер 3 завершен)

Listening... (Listening for Computer)
Socket connected
Controller is connected to computer#0
Socket connected
Controller is connected to computer#1
Socket connected
Controller is connected to computer#2
Socket connected
Controller is connected to computer#3
Computer 2 is finished
Computer 1 is finished
Computer#2 is waiting for tear down.
Computer 0 is finished
Computer#1 is waiting for tear down.
Computer#0 is waiting for tear down.

Компьютер # 0 Журнал: (Я сократил журнал, потому что было много операций чтения и записи журнала событий)

Computer is starting!
Server Socket Instantiated
Client Socket Created
Client Socket Created
Client Socket Created
Client Sockets Instantiated

Server connected to clients
// A bunch of reading and writing events
Finished writing. Continue reading...
Sent Finish Message 0

Компьютер # 1 Журнал:

Computer is starting!
Server Socket Instantiated
Client Socket Created
Client Socket Created
Client Socket Created
Client Sockets Instantiated

Server connected to clients
Finished writing. Continue reading...
Sent Finish Message 1

Компьютер # 2

Computer is starting!
Server Socket Instantiated
Client Socket Created
Client Socket Created
Client Socket Created
Client Sockets Instantiated

Reading events
Server connected to clients
Finished writing. Continue reading...
Sent Finish Message 2

Компьютер # 3: Согласно журналу, это отправило готовое сообщение контроллеру

Computer is starting!
Server Socket Instantiated
Client Socket Created
Client Socket Created
Client Socket Created
Client Sockets Instantiated

Reading events
Server connected to clients
Finished writing. Continue reading...
Sent Finish Message 3
...