Сокеты NIO - распределенная система - PullRequest
0 голосов
/ 22 апреля 2020

У меня есть три процесса, которые называются LWA1, LWA2 и LWA3. У каждого есть сервер с портом 55555 для LWA1, 55556 для LWA2 и 55557 для LWA3. Кроме того, у каждого процесса есть клиент для подключения к другим процессам.

Каждый процесс должен иметь возможность записи и чтения для других процессов. Итак:

  • LWA1 должен записывать и читать в / из LWA2, а LWA3
  • LWA2 должен записывать и читать в / из LWA1, а LWA3
  • LWA3 должен записывать и читать в / из LWA1 и LWA2

В настоящий момент каждый процесс выполняет две записи, но получает только одно сообщение. Выходные данные для каждого процесса выглядят следующим образом (отпечатки с вкладками принадлежат клиенту, а неотмеченные - серверу).

LWA1:

Setting up server with port: 55555
Server configured.

    Opening sockets to port 55556 and port 55557
    Sending lamport request: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA3', id=3}
Key accepted

LWA2:

Setting up server with port: 55556
Server configured.

    Opening sockets to port 55557 and port 55555
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
    Sending lamport request: LamportRequest{clock=0, process='LWA2', id=2}

LWA3:

Setting up server with port: 55557
Server configured.

    Opening sockets to port 55555 and port 55556
Key accepted
Key accepted
    Sending lamport request: LamportRequest{clock=0, process='LWA3', id=3}
Reading data from server
I read: LamportRequest{clock=0, process='LWA2', id=2}

Как вы можете видеть, каждый клиент записывает два других запроса LamportRequest, но два других получают только одно сообщение. Почему другое сообщение не проходит?

Я подозреваю, что это может быть что-то, связанное с ключами на сервере, но не знаю, что может быть. Кроме того, я не до конца их понимаю. Поправьте меня, если я ошибаюсь:

Каждое соединение с селектором представлено другим (SelectableChannel) ключом, поэтому итератор на сервере LWA1 (например) должен иметь только (и, следовательно, только слушать события) две клавиши, одна для LWA2 и другая для LWA3, верно? Я попытался присоединить целое число к каждому ключу в методе keyAccept, чтобы различать их guish, что работало нормально, но при печати присоединенного целого числа в методе keyRead показывало ноль. Ключ в этом методе новый? Третий ключ только что появился из ниоткуда?

Дополнительный вопрос: я должен реализовать эту структуру в одном потоке. В настоящее время я использую два, один для сервера и один для клиента. Как только это заработает, какие-нибудь советы, как их объединить?

------------------ КОД -----------------

Сервер (упрощенно для чтения) выглядит следующим образом:

public TalkToBrotherSocket(int clock, int port) {
    this.port = port;
    this.clock = clock;

    try {
        setServer();
        System.out.println("Server configured.\n");
    } catch (IOException e) {
        e.printStackTrace();
    }
}

@Override
public void run() {
    while (true) {
        try {
            // Wait for an event one of the registered channels
            selector.select();

            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = (SelectionKey) selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check if they key is ready to accept a new socket connection
                if (key.isAcceptable()) {
                    keyAccept(key);
                    System.out.println("Key accepted");
                } else if (key.isReadable()){
                    System.out.println("Reading data from server");
                    keyRead(key);
                } else if (key.isWritable()){
                    System.out.println("Writting data from server");
                    keyWrite(key); //unused at the moment
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


private void keyRead(SelectionKey key) throws IOException {
    // Create a SocketChannel to read the request
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    buffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(buffer);
    } catch (IOException e) {
        System.out.println("Closing socket");
        // The remote forcibly closed the connection, cancel
        // the selection key and close the channel.
        key.cancel();
        socketChannel.close();
        return;
    }

    if (numRead == -1) {
        System.out.println("Shutting down socket");
        // Remote entity shut the socket down cleanly. Do the
        // same from our end and cancel the channel.
        key.channel().close();
        key.cancel();
        return;
    }

    System.out.println("I read: " + new String(buffer.array()).trim());
}

private void keyAccept(SelectionKey key) throws IOException {
    // For an accept to be pending the channel must be a server socket channel.
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    // Accept the connection and make it non-blocking
    SocketChannel socketChannel = serverSocketChannel.accept();
    //Socket socket = socketChannel.socket();
    socketChannel.configureBlocking(false);

    // Register the new SocketChannel with our Selector, indicating
    // we'd like to be notified when there's data waiting to be read
    socketChannel.register(selector, SelectionKey.OP_READ);
}

private void setServer() throws IOException {
    // Create a new selector
    selector = Selector.open();

    // Create a new non-blocking server socket channel
    serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    // Bind the server socket to the specified address and port
    serverChannel.bind(new InetSocketAddress("localhost", port));

    // Register the server socket channel, indicating an interest in
    // accepting new connections
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}

Клиент (упрощенно для чтения) работает следующим образом:

public NIOClient(int clock, int firstPort, int secondPort, int id, String process) {
    this.process = process;
    this.clock = clock;
    this.id = id;

    try {
        System.out.println("\tOpening sockets to port " + firstPort + " and port " + secondPort);
        firstClient = SocketChannel.open(new InetSocketAddress("localhost", firstPort));
        secondClient = SocketChannel.open(new InetSocketAddress("localhost", secondPort));
        firstBuffer = ByteBuffer.allocate(1024);
        secondBuffer = ByteBuffer.allocate(1024);
        sendRequests();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

private void sendRequests() {
    LamportRequest lamportRequest = new LamportRequest(clock, process, id);
    firstBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
    secondBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
    String converted = new String(firstBuffer.array(), StandardCharsets.UTF_8);
    System.out.println("\tSending lamport request: " + converted);
    try {
        firstClient.write(firstBuffer);
        secondClient.write(secondBuffer);
        firstBuffer.clear();
}

Инициализируется следующим образом:

System.out.println("Setting up server with port: " + myPort);
TalkToBrotherSocket talkToBrotherSocket = new TalkToBrotherSocket(clock, myPort);
talkToBrotherSocket.start();

new NIOClient(clock, firstPort, secondPort, id, process);

1 Ответ

0 голосов
/ 23 апреля 2020

Следуя комментариям @ user207421, я добавил новый второй буфер. Изменения в методе sendRequests были отредактированы в исходном сообщении.

...