Сеть Java: вечерний сокет / InputStream - PullRequest
6 голосов
/ 11 мая 2011

Я реализую ориентированный на события слой поверх сокетов Java, и мне было интересно, есть ли способ определить, есть ли данные, ожидающие чтения.

Мой обычный подход состоит в том, чтобы читать из сокета в буфер и вызывать предоставленные обратные вызовы, когда буфер заполнен заданным количеством байтов (которое может быть 0, если обратный вызов должен запускаться каждый раз, когда что-либо поступает). ), но я подозреваю, что Java уже выполняет буферизацию для меня.

Надежен ли для этого метод available() InputStream? Должен ли я просто read() и сделать свою собственную буферизацию поверх сокета? Или есть другой способ?

Ответы [ 2 ]

8 голосов
/ 12 мая 2011

Коротко говоря, нет.available() не надежно (по крайней мере, не для меня).Я рекомендую использовать java.nio.channels.SocketChannel, связанный с Selector и SelectionKey.Это решение несколько основано на событиях, но сложнее, чем просто сокеты.

Для клиентов:

  1. Построить канал сокета (socket), открыть селектор (selector = Selector.open();).
  2. Использовать неблокирующую socket.configureBlocking(false);
  3. Селектор регистра для подключений socket.register(selector, SelectionKey.OP_CONNECT);
  4. Соединение socket.connect(new InetSocketAddress(host, port));
  5. Проверьте, есть ли что-то новое selector.select();
  6. Если «новое» относится к успешному соединению, зарегистрируйте селектор для OP_READ;если «новый» относится к доступным данным, просто прочитайте из сокета.

Однако для того, чтобы он был асинхронным, вам необходимо создать отдельный поток (несмотря на то, что сокет создается как не-блокирована, поток будет заблокирован в любом случае), который проверяет, прибыло ли что-то или нет.

Для серверов есть ServerSocketChannel, и вы используете OP_ACCEPT для него.

Для справки,это мой код (клиент), должен дать вам подсказку:

 private Thread readingThread = new ListeningThread();

 /**
  * Listening thread - reads messages in a separate thread so the application does not get blocked.
  */
 private class ListeningThread extends Thread {
  public void run() {
   running = true;
   try {
    while(!close) listen();
    messenger.close();
   }
   catch(ConnectException ce) {
    doNotifyConnectionFailed(ce);
   }
   catch(Exception e) {
//    e.printStackTrace();
    messenger.close();
   }
   running = false;
  }
 }

 /**
  * Connects to host and port.
  * @param host Host to connect to.
  * @param port Port of the host machine to connect to.
  */
 public void connect(String host, int port) {
  try {
   SocketChannel socket = SocketChannel.open();
   socket.configureBlocking(false);
   socket.register(this.selector, SelectionKey.OP_CONNECT);
   socket.connect(new InetSocketAddress(host, port));
  }
  catch(IOException e) {
   this.doNotifyConnectionFailed(e);
  }
 }

 /**
  * Waits for an event to happen, processes it and then returns.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   iter.remove();
   // check validity
   if(key.isValid()) {
    // if connectable...
    if(key.isConnectable()) {
     // ...establish connection, make messenger, and notify everyone
     SocketChannel client = (SocketChannel)key.channel();
     // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast
     if(client!=null && client.finishConnect()) {
      client.register(this.selector, SelectionKey.OP_READ);
     }
    }
    // if readable, tell messenger to read bytes
    else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) {
     // read message here
    }
   }
  }
 }

 /**
  * Starts the client.
  */
 public void start() {
  // start a reading thread
  if(!this.running) {
   this.readingThread = new ListeningThread();
   this.readingThread.start();
  }
 }

 /**
  * Tells the client to close at nearest possible moment.
  */
 public void close() {
  this.close = true;
 }

А для сервера:

 /**
  * Constructs a server.
  * @param port Port to listen to.
  * @param protocol Protocol of messages.
  * @throws IOException when something goes wrong.
  */
 public ChannelMessageServer(int port) throws IOException {
  this.server = ServerSocketChannel.open();
  this.server.configureBlocking(false);
  this.server.socket().bind(new InetSocketAddress(port));
  this.server.register(this.selector, SelectionKey.OP_ACCEPT);
 }

 /**
  * Waits for event, then exits.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   // do something with the connected socket
   iter.remove();
   if(key.isValid()) this.process(key);
  }
 }

 /**
  * Processes a selection key.
  * @param key SelectionKey.
  * @throws IOException when something is wrong.
  */
 protected void process(SelectionKey key) throws IOException {
  // if incoming connection
  if(key.isAcceptable()) {
   // get client
   SocketChannel client = (((ServerSocketChannel)key.channel()).accept());
    try {
     client.configureBlocking(false);
     client.register(this.selector, SelectionKey.OP_READ);
    }
    catch(Exception e) {
     // catch
    }
  }
  // if readable, tell messenger to read
  else if(key.isReadable()) {
  // read
  }
 }

Надеюсь, это поможет.

0 голосов
/ 11 мая 2011

available () сообщит вам только, можете ли вы читать данные, не заходя в ОС.Здесь это не очень полезно.

Вы можете делать блокирующее или неблокирующее чтение по своему усмотрению.Неблокирующее чтение просто возвращается, когда нет данных для чтения, так что это может быть тем, что вы хотите.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...