В настоящее время у меня есть несколько серверов, подключенных друг к другу (в следующем коде они называются Сервером), и я хотел бы отправить сообщение каждому из подключенных серверов, когда Клиент отправляет запрос на указанный сервер c. .
Часть, в которой клиент отправил сообщение на сервер, выполнена и работает. Однако, когда я зацикливаю список подключенных «серверов», чтобы отправить им необходимое сообщение, само сообщение не отправляется.
Если кто-то может сообщить мне, что я делаю неправильно, или дать мне подсказку, где я должен изменится, это было бы очень признательно, так как я действительно застрял на данный момент.
Вот где я сейчас нахожусь:
Основной метод сервера:
// Server Details contain the Server Name, the Server IP and Server Port
serverDetails.forEach((serverDetail) -> {
try {
ByteBuffer buffer = ByteBuffer.wrap("Test Message".getBytes());
String serverAddress = serverDetail.getNodeAddress();
serverNameAvail = serverDetail.getServerName();
int serverPort = serverDetail.getServerPort();
// Tries to connect to other Server
InetSocketAddress hostAddress = new InetSocketAddress(serverAddress, serverPort);
SocketChannel client = SocketChannel.open(hostAddress);
// send message to other server if successfully connected
client.write(buffer);
buffer.clear();
} catch (IOException ex) {
System.err.println(ex);
}
});
ServerSocket serverSocket = new ServerSocket(12111, "127.0.0.1", "Server");
serverSocket.run();
ServerSocket Class
publi c класс ServerSocket реализует Runnable {
private final InetAddress hostAddress;
private final int port;
private final String serverName;
private ServerSocketChannel serverSocketChannel;
private final Selector selector;
private final boolean runServer = true;
private ConcurrentHashMap<SocketChannel, Long> servers = new ConcurrentHashMap<>();
public ServerSocket(InetAddress hostAddress, int port, String serverName) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
this.serverName = serverName;
this.selector = initSelector();
}
private Selector initSelector() throws IOException {
Selector socketSelector = SelectorProvider.provider().openSelector();
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(isa);
serverSocketChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
@Override
public void run() {
while (runServer) {
try {
this.selector.select();
Iterator<SelectionKey> selectedKeys = this.selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
try {
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
} else if (key.isReadable()) {
this.read(key);
}
} catch (CancelledKeyException e) {// key has been canceled
}
}
} catch (IOException e) {
System.err.println(e);
try {
serverSocketChannel.close();
selector.close();
} catch (IOException e1) {
System.err.println(e1);
}
break;
}
}
System.out.println("[Server thread closed normally]");
/* Clean up the resources */
this.release();
}
private void accept(SelectionKey key) {
ServerSocketChannel serverSocketKeyChannel = (ServerSocketChannel) key.channel();
try {
SocketChannel socketChannel = serverSocketKeyChannel.accept();
if (socketChannel != null) {
/* Set the KeepAlive flag to avoid continuous open of files */
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.configureBlocking(false);
/* Register the client connected with our interested Option Read */
socketChannel.register(this.selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
servers.put(socketChannel, System.currentTimeMillis());
System.out.println("New Server connected from " + socketChannel.getRemoteAddress());
System.out.println("Total connected : " + servers.size());
} else {
key.cancel();
}
} catch (IOException e) {
key.cancel();
System.err.println(e);
}
}
private void read(SelectionKey key) {
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (socketChannel) {
if (socketChannel.isOpen()) {
try {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
readBuffer.clear();
int numRead;
try {
/* ".read" is nonblocking */
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
System.err.println("[run] Connection abruptly terminated from client" + e);
key.channel().close();
servers.remove(socketChannel);
return;
}
if (numRead == -1) {// socket closed cleanly
key.channel().close();
servers.remove(socketChannel);
return;
}
String dataReceivedPacked = new String(readBuffer.array(), Charset.forName("ASCII"));
System.out.println(dataReceivedPacked);
if (dataReceivedPacked.equals("User")) {
System.out.println("User entered Here");
Iterator<Map.Entry<SocketChannel, Long>> iter = servers.entrySet().iterator();
while (iter.hasNext()) {
try {
Map.Entry<SocketChannel, Long> entry = iter.next();
// Message Not being sent
SocketChannel server = entry.getKey();
System.out.println("client = " + server.isOpen()); // True
System.out.println("client connected = " + server.isConnected()); // True
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer = ByteBuffer.wrap("Server".getBytes());
server.write(buffer);
} catch (Exception e) {
System.err.println(e);
}
}
} else if (dataReceivedPacked.equals("Server")) {
// Never gets here
System.out.println("Received From Server");
}
} catch (IOException e) {
System.err.println("[run] " + e);
return;
}
} else {
// socketChannel is closed
try {
key.channel().close();// Sanitary close operation
servers.remove(key);
return;
} catch (IOException e) {
}
}
}
}
}
Код пользователя (часть)
//<editor-fold defaultstate="collapsed" desc="Connect Wallet To Node">
hostAddress = new InetSocketAddress("127.0.0.1", 12111);
client = SocketChannel.open(hostAddress);
//</editor-fold>
buffer = ByteBuffer.wrap("User".getBytes());
client.write(buffer);