Один поток останавливается слишком рано независимо от CyclicBarrier - PullRequest
2 голосов
/ 11 февраля 2011

Мне известно о том, что следующий код может показаться вульгарным, но я новичок в этих вещах и просто попробовал все, чтобы заставить его работать ..

Проблема: Даже если я использую(возможно, неправильно) CyclicBarrier, один - и, кажется, всегда один и тот же - поток останавливается слишком рано и печатает свой вектор, оставляя 1 из 11 сообщений "Входящее соединение" отсутствующим.Возможно, что-то ужасно не так с последней итерацией моего цикла, но я не могу найти, что именно .. Теперь программа просто зацикливается, ожидая обработки последнего соединения.

public class VectorClockClient implements Runnable {
/*
 * Attributes
 */

/*
 * The client number is store to provide fast
 * array access when, for example, a thread's own
 * clock simply needs to be incremented.
 */
private int clientNumber;
private File configFile, inputFile;
int[] vectorClock;

/*
 * Constructor
 * @param
 * - File config
 * - int line
 * - File input
 * - int clients
 */
public VectorClockClient(File config, int line, File input, int clients) {
    /*
     * Make sure that File handles aren't null and that
     * the line number is valid.
     */
    if (config != null && line >= 0 && input != null) {
        configFile = config;
        inputFile = input;
        clientNumber = line;
        /*
         * Set the array size to the number of lines found in the
         * config file and initialize with zero values.
         */
        vectorClock = new int[clients];
        for (int i = 0; i < vectorClock.length; i++) {
            vectorClock[i] = 0;
        }
    }
}

private int parsePort() {
    int returnable = 0;
    try {
        FileInputStream fstream = new FileInputStream(configFile.getName());
        DataInputStream in = new DataInputStream(fstream);
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
        String strLine = "";
        for (int i = 0; i < clientNumber + 1; i++) {
            strLine = br.readLine();
        }
        String[] tokens = strLine.split(" ");
        returnable = Integer.parseInt(tokens[1]);
    }
    catch (Exception e) {
        e.printStackTrace();
    }
    System.out.println("[" + clientNumber + "] returned with " + returnable + ".");
    return returnable;
}

private int parsePort(int client) {
    int returnable = 0;
    try {
        FileInputStream fstream = new FileInputStream(configFile.getName());
        DataInputStream in = new DataInputStream(fstream);
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
        String strLine = "";
        for (int i = 0; i < client; i++) {
            strLine = br.readLine();
        }
        String[] tokens = strLine.split(" ");
        returnable = Integer.parseInt(tokens[1]);
    }
    catch (Exception e) {
        e.printStackTrace();
    }
    return returnable;
}

private int parseAction(String s) {
    int returnable = -1;
    try {
        FileInputStream fstream = new FileInputStream(configFile.getName());
        DataInputStream in = new DataInputStream(fstream);
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
        String[] tokens = s.split(" ");
        if (!(Integer.parseInt(tokens[0]) == this.clientNumber + 1)) {
            return -1;
        }
        else {
            if (tokens[1].equals("L")) {
                vectorClock[clientNumber] += Integer.parseInt(tokens[2]);
            }
            else {
                returnable = Integer.parseInt(tokens[2]);
            }
        }
    }
    catch (Exception e) {
        e.printStackTrace();
    }
    return returnable;
}

/*
 * Do the actual work.
 */
public void run() {
    try {
        InitClients.barrier.await();
    }
    catch (Exception e) {
        System.out.println(e);
    }
    int port = parsePort();
    String hostname = "localhost";
    String strLine;
    ServerSocketChannel ssc;
    SocketChannel sc;
    FileInputStream fstream;
    DataInputStream in;
    BufferedReader br;
    boolean eof = false;
    try {
        ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(hostname, port));
        ssc.configureBlocking(false);
        fstream = new FileInputStream("input_vector.txt");
        in = new DataInputStream(fstream);
        br = new BufferedReader(new InputStreamReader(in));

        try {
            InitClients.barrier.await();
        }
        catch (Exception e) {
            System.out.println(e);
        }

        while (true && (eof == false)) {
            sc = ssc.accept();

            if (sc == null) {
                if ((strLine = br.readLine()) != null) {
                    int result = parseAction(strLine);
                    if (result >= 0) {
                        //System.out.println("[" + (clientNumber + 1)
                        //+ "] Send a message to " + result + ".");
                        try {
                            SocketChannel client = SocketChannel.open();
                            client.configureBlocking(true);
                            client.connect(
                                    new InetSocketAddress("localhost",
                                    parsePort(result)));
                            //ByteBuffer buf = ByteBuffer.allocateDirect(32);
                            //buf.put((byte)0xFF);
                            //buf.flip();
                            //vectorClock[clientNumber] += 1;
                            //int numBytesWritten = client.write(buf);
                            String obj = Integer.toString(clientNumber+1);
                            ObjectOutputStream oos = new 
                                    ObjectOutputStream(
                                    client.socket().getOutputStream());
                            oos.writeObject(obj);
                            oos.close();
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                else {
                    eof = true;
                }
            }
            else {
                ObjectInputStream ois = new 
                        ObjectInputStream(sc.socket().getInputStream());
                String clientNumberString = (String)ois.readObject();
                System.out.println("At {Client[" + (clientNumber + 1)
                        + "]}Incoming connection from: "
                        + sc.socket().getRemoteSocketAddress()
                        + " from {Client[" + clientNumberString + "]}");
                sc.close();
            }
            try {
                InitClients.barrier.await();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    catch (Exception e) {
        e.printStackTrace();
    }
    printVector();
}

private void printVector() {
    System.out.print("{Client[" + (clientNumber + 1) + "]}{");
    for (int i = 0; i < vectorClock.length; i++) {
        System.out.print(vectorClock[i] + "\t");
    }
    System.out.println("}");
}

}Для пояснения, вот форматы используемых файлов.Config содержит имена хостов и порты, используемые клиентами, которые являются потоками, и строки входного файла означают либо «этот клиент отправляет сообщение этому клиенту», либо «этот клиент увеличивает свои логические часы на некоторое постоянное значение».

1 M 2 (M означает отправку сообщения)2 м 33 М 42 L 7 (L означает увеличение тактовой частоты)2 М 1...127.0.0.1 9000127.0.0.1 9001127.0.0.1 9002127.0.0.1 9003...

1 Ответ

0 голосов
/ 11 февраля 2011

Я бы посмотрел на логику, связанную с тем, когда вы ожидаете входящего соединения сокета.От вашего вопроса похоже, что вы ожидаете определенного количества входящих соединений сокетов (возможно, входящее соединение после каждого исходящего сообщения?).Так как вы используете неблокирующий ввод / вывод на входящем сокете, всегда возможно, что ваш оператор while зацикливается до того, как будет установлен входящий сокет.В результате поток сможет продолжить и читать следующую строку из файла без получения соединения.Поскольку ваше конечное состояние достигается после достижения конца файла, возможно, вы можете пропустить входящее сокетное соединение.

Я бы добавил несколько простых распечаток, которые отображаются при чтении из файла, при отправке сообщения и при получении входящего соединения.Это должно быстро сообщить вам, отсутствует ли в определенном потоке ожидаемое входящее соединение.Если выясняется, что проблема связана с неблокирующим вводом / выводом, то вам может потребоваться отключить неблокирующий ввод / вывод, когда вы ожидаете входящий сокет, или реализовать элемент управления, который отслеживает ожидаемое количество входящих сокетов.и продолжается до тех пор, пока эта цель не будет достигнута.

Надеюсь, это поможет.

...