Мне пришлось подключить несколько IP-адресов серверов: ПОРТЫ и сделать обмен запросами-ответами. После реализации традиционного ввода-вывода с несколькими потоками и сторожевым таймером блокированные сокеты сдавались. Я реализовал NIO, и это мое тестовое приложение для дальнейшего использования.
Я могу открыть N соединений с тайм-аутом, прочитать ответ с тайм-аутом, написать команду с тайм-аутом все в простом однопоточном «игровом цикле». Если бы мне был нужен параллелизм, я мог бы создавать рабочие потоки, но это не обязательно, если логике приложения это не нужно.
Сервер - это пользовательское приложение telnet, клиенты пишут команды и читают текстовые строки, пока не будет найден запрос строки терминатора. Терминатор отмечает end_of_response_packet.
import java.util.*;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
public class ClientSocketNIO {
private String host;
private int port;
private String charset;
private ByteArrayOutputStream inBuffer;
private ByteBuffer buf;
private Selector selector;
private SocketChannel channel;
public ClientSocketNIO(String host, int port, String charset) {
this.charset = charset==null || charset.equals("") ? "UTF-8" : charset;
this.host = host;
this.port = port;
}
public void open(long timeout) throws IOException {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_CONNECT);
channel.connect(new InetSocketAddress(host, port));
inBuffer = new ByteArrayOutputStream(1024);
buf = ByteBuffer.allocate(1*1024);
long sleep = Math.min(timeout, 1000);
while(timeout > 0) {
if (selector.select(sleep) < 1) {
timeout-=sleep;
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid() || !key.isConnectable()) continue;
SocketChannel channel = (SocketChannel)key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
channel.configureBlocking(false);
return; // we are ready to receive bytes
}
}
}
throw new IOException("Connection timed out");
}
public void close() {
try { channel.close(); } catch(Exception ex) { }
try { selector.close(); } catch(Exception ex) { }
inBuffer=null;
buf=null;
}
public List<String> readUntil(String terminator, long timeout, boolean trimLines) throws IOException {
return readUntil(new String[]{terminator}, timeout, trimLines);
}
public List<String> readUntil(String[] terminators, long timeout, boolean trimLines) throws IOException {
List<String> lines = new ArrayList<String>(12);
inBuffer.reset();
// End of packet terminator strings, line startsWith "aabbcc" string.
byte[][] arrTerminators = new byte[terminators.length][];
int[] idxTerminators = new int[terminators.length];
for(int idx=0; idx < terminators.length; idx++) {
arrTerminators[idx] = terminators[idx].getBytes(charset);
idxTerminators[idx] = 0;
}
int idxLineByte=-1;
channel.register(selector, SelectionKey.OP_READ);
long sleep = Math.min(timeout, 1000);
while(timeout>0) {
if (selector.select(sleep) < 1) {
timeout-=sleep;
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid() || !key.isReadable()) continue;
SocketChannel channel = (SocketChannel)key.channel();
buf.clear();
int len = channel.read(buf);
System.out.println("read " + len);
if (len == -1) throw new IOException("Socket disconnected");
buf.flip();
for(int idx=0; idx<len; idx++) {
byte cb = buf.get(idx);
if (cb!='\n') {
idxLineByte++;
inBuffer.write(cb);
for(int idxter=0; idxter < arrTerminators.length; idxter++) {
byte[] arrTerminator = arrTerminators[idxter];
if (idxLineByte==idxTerminators[idxter]
&& arrTerminator[ idxTerminators[idxter] ]==cb) {
idxTerminators[idxter]++;
if (idxTerminators[idxter]==arrTerminator.length)
return lines;
} else idxTerminators[idxter]=0;
}
} else {
String line = inBuffer.toString(charset);
lines.add(trimLines ? line.trim() : line);
inBuffer.reset();
idxLineByte=-1;
for(int idxter=0; idxter<arrTerminators.length; idxter++)
idxTerminators[idxter]=0;
}
}
}
}
throw new IOException("Read timed out");
}
public void write(String data, long timeout) throws IOException {
ByteBuffer outBuffer = ByteBuffer.wrap(data.getBytes(charset));
channel.register(selector, SelectionKey.OP_WRITE);
long sleep = Math.min(timeout, 1000);
while(timeout > 0) {
if (selector.select(sleep) < 1) {
timeout-=sleep;
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid() || !key.isWritable()) continue;
SocketChannel channel = (SocketChannel)key.channel();
int len = channel.write(outBuffer);
System.out.println("write " + len);
if (outBuffer.remaining()<1)
return;
}
}
throw new IOException("Write timed out");
}
public static void main(String[] args) throws Exception {
ClientSocketNIO client = new ClientSocketNIO("11.22.33.44", 1234, "UTF-8");
try {
client.open(15000);
// read prompting for username
List<String> reply = client.readUntil("User: ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
// write username and read a success or failed prompt(asks username once again),
// this one may return two different terminator prompts so listen for both
client.write("myloginname\n", 15000);
reply = client.readUntil(new String[]{"> ", "User: "}, 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
if (!reply.get(reply.size()-1).startsWith("Welcome ")) return; // Access denied
System.out.println("-----");
client.write("help\n", 15000);
reply = client.readUntil("> ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
System.out.println("-----");
client.write("get status\n", 15000);
reply = client.readUntil("> ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
System.out.println("-----");
client.write("get list\n", 15000);
reply = client.readUntil("> ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
client.write("quit\n", 15000);
} finally {
client.close();
}
}
}