У меня есть три процесса, которые называются LWA1, LWA2 и LWA3. У каждого есть сервер с портом 55555 для LWA1, 55556 для LWA2 и 55557 для LWA3. Кроме того, у каждого процесса есть клиент для подключения к другим процессам.
Каждый процесс должен иметь возможность записи и чтения для других процессов. Итак:
- LWA1 должен записывать и читать в / из LWA2, а LWA3
- LWA2 должен записывать и читать в / из LWA1, а LWA3
- LWA3 должен записывать и читать в / из LWA1 и LWA2
В настоящий момент каждый процесс выполняет две записи, но получает только одно сообщение. Выходные данные для каждого процесса выглядят следующим образом (отпечатки с вкладками принадлежат клиенту, а неотмеченные - серверу).
LWA1:
Setting up server with port: 55555
Server configured.
Opening sockets to port 55556 and port 55557
Sending lamport request: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA3', id=3}
Key accepted
LWA2:
Setting up server with port: 55556
Server configured.
Opening sockets to port 55557 and port 55555
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Sending lamport request: LamportRequest{clock=0, process='LWA2', id=2}
LWA3:
Setting up server with port: 55557
Server configured.
Opening sockets to port 55555 and port 55556
Key accepted
Key accepted
Sending lamport request: LamportRequest{clock=0, process='LWA3', id=3}
Reading data from server
I read: LamportRequest{clock=0, process='LWA2', id=2}
Как вы можете видеть, каждый клиент записывает два других запроса LamportRequest, но два других получают только одно сообщение. Почему другое сообщение не проходит?
Я подозреваю, что это может быть что-то, связанное с ключами на сервере, но не знаю, что может быть. Кроме того, я не до конца их понимаю. Поправьте меня, если я ошибаюсь:
Каждое соединение с селектором представлено другим (SelectableChannel) ключом, поэтому итератор на сервере LWA1 (например) должен иметь только (и, следовательно, только слушать события) две клавиши, одна для LWA2 и другая для LWA3, верно? Я попытался присоединить целое число к каждому ключу в методе keyAccept, чтобы различать их guish, что работало нормально, но при печати присоединенного целого числа в методе keyRead показывало ноль. Ключ в этом методе новый? Третий ключ только что появился из ниоткуда?
Дополнительный вопрос: я должен реализовать эту структуру в одном потоке. В настоящее время я использую два, один для сервера и один для клиента. Как только это заработает, какие-нибудь советы, как их объединить?
------------------ КОД -----------------
Сервер (упрощенно для чтения) выглядит следующим образом:
public TalkToBrotherSocket(int clock, int port) {
this.port = port;
this.clock = clock;
try {
setServer();
System.out.println("Server configured.\n");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
// Wait for an event one of the registered channels
selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check if they key is ready to accept a new socket connection
if (key.isAcceptable()) {
keyAccept(key);
System.out.println("Key accepted");
} else if (key.isReadable()){
System.out.println("Reading data from server");
keyRead(key);
} else if (key.isWritable()){
System.out.println("Writting data from server");
keyWrite(key); //unused at the moment
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void keyRead(SelectionKey key) throws IOException {
// Create a SocketChannel to read the request
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
buffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(buffer);
} catch (IOException e) {
System.out.println("Closing socket");
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
System.out.println("Shutting down socket");
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
System.out.println("I read: " + new String(buffer.array()).trim());
}
private void keyAccept(SelectionKey key) throws IOException {
// For an accept to be pending the channel must be a server socket channel.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// Accept the connection and make it non-blocking
SocketChannel socketChannel = serverSocketChannel.accept();
//Socket socket = socketChannel.socket();
socketChannel.configureBlocking(false);
// Register the new SocketChannel with our Selector, indicating
// we'd like to be notified when there's data waiting to be read
socketChannel.register(selector, SelectionKey.OP_READ);
}
private void setServer() throws IOException {
// Create a new selector
selector = Selector.open();
// Create a new non-blocking server socket channel
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
serverChannel.bind(new InetSocketAddress("localhost", port));
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
Клиент (упрощенно для чтения) работает следующим образом:
public NIOClient(int clock, int firstPort, int secondPort, int id, String process) {
this.process = process;
this.clock = clock;
this.id = id;
try {
System.out.println("\tOpening sockets to port " + firstPort + " and port " + secondPort);
firstClient = SocketChannel.open(new InetSocketAddress("localhost", firstPort));
secondClient = SocketChannel.open(new InetSocketAddress("localhost", secondPort));
firstBuffer = ByteBuffer.allocate(1024);
secondBuffer = ByteBuffer.allocate(1024);
sendRequests();
} catch (IOException e) {
e.printStackTrace();
}
}
private void sendRequests() {
LamportRequest lamportRequest = new LamportRequest(clock, process, id);
firstBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
secondBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
String converted = new String(firstBuffer.array(), StandardCharsets.UTF_8);
System.out.println("\tSending lamport request: " + converted);
try {
firstClient.write(firstBuffer);
secondClient.write(secondBuffer);
firstBuffer.clear();
}
Инициализируется следующим образом:
System.out.println("Setting up server with port: " + myPort);
TalkToBrotherSocket talkToBrotherSocket = new TalkToBrotherSocket(clock, myPort);
talkToBrotherSocket.start();
new NIOClient(clock, firstPort, secondPort, id, process);