Java-неблокирующий селектор ввода-вывода вызывает блокировку регистра канала - PullRequest
4 голосов
/ 06 июля 2010

У меня есть два потока, которые я имею дело с Java NIO для неблокирующих сокетов. Вот что делают потоки:

Тема 1: Цикл, который вызывает метод select () селектора. Если какие-либо ключи доступны, они обрабатываются соответствующим образом.

Тема 2: Иногда регистрирует SocketChannel для селектора, вызывая register ().

Проблема в том, что, если время ожидания для select () не очень мало (около 100 мс), вызов register () будет блокироваться бесконечно. Несмотря на то, что канал настроен как неблокирующий, а в javadocs говорится, что объект Selector является поточно-ориентированным (но я знаю, что его ключи выбора не являются).

Значит, у кого-нибудь есть идеи по поводу этой проблемы? Приложение работает отлично, если я положу все в один поток. Тогда проблем не возникает, но я бы очень хотел иметь отдельные темы. Любая помощь приветствуется. Я разместил мой пример кода ниже:

Измените выбор (1000), чтобы выбрать (100), и он будет работать. Оставьте его как select () или select (1000), и он не будет.


import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;</p>

<p>public class UDPSocket 
{
 private DatagramChannel clientChannel;
 private String dstHost;
 private int dstPort;
 private static Selector recvSelector;
 private static volatile boolean initialized;
 private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();</p>

<p>public static void init()
 {
  initialized = true;</p>

<p>try 
  {
   recvSelector = Selector.open();
  } 
  catch (IOException e) 
  {
   System.err.println(e);
  }</p>

<p>Thread t = new Thread(new Runnable()
  {
   @Override
   public void run() 
   {
    while(initialized)
    {
     readData();
     Thread.yield();
    }
   }<br>
  });
  t.start();
 }</p>

<p>public static void shutdown()
 {
  initialized = false;
 }</p>

<p>private static void readData()
 {
  try
  {
   int numKeys = recvSelector.select(1000);</p>

<p>if (numKeys > 0)
   {
    Iterator i = recvSelector.selectedKeys().iterator();</p>

<pre><code>while(i.hasNext())
{
 SelectionKey key = i.next();
 i.remove();

 if (key.isValid() && key.isReadable())
 {
  DatagramChannel channel = (DatagramChannel) key.channel();

  // allocate every time we receive so that it's a copy that won't get erased
  final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
  channel.receive(buffer);
  buffer.flip();
  final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();

  // let user handle event on a dedicated thread
  eventQueue.execute(new Runnable()
  {
   @Override
   public void run() 
   {
    subscriber.onData(buffer);
   }       
  });
 }
}

} } поймать (IOException e) { System.err.println (е); }
}

публичный UDPSocket (String dstHost, int dstPort) { пытаться { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open (); clientChannel.configureBlocking (ложь); } поймать (IOException e) { System.err.println (е); } }

public void addListener (подписчик SocketSubscriber) { пытаться { DatagramChannel serverChannel = DatagramChannel.open (); serverChannel.configureBlocking (ложь); DatagramSocket socket = serverChannel.socket (); socket.bind (новый InetSocketAddress (dstPort)); SelectionKey key = serverChannel.register (recvSelector, SelectionKey.OP_READ); key.attach (абонент); } поймать (IOException e) { System.err.println (е); } }

public void send (буфер ByteBuffer) { пытаться { clientChannel.send (буфер, новый InetSocketAddress (dstHost, dstPort)); } поймать (IOException e) { System.err.println (е); } }

public void close () { пытаться { clientChannel.close (); } поймать (IOException e) { System.err.println (е); } } }

<code>
import java.nio.ByteBuffer;</p>

<p>public interface SocketSubscriber 
{
 public void onData(ByteBuffer data);
}

Пример использования:

<code>
public class Test implements SocketSubscriber
{
 public static void main(String[] args) throws Exception
 {
  UDPSocket.init();
  UDPSocket test = new UDPSocket("localhost", 1234);
  test.addListener(new Test());
  UDPSocket test2 = new UDPSocket("localhost", 4321);
  test2.addListener(new Test());
  System.out.println("Listening...");
  ByteBuffer buffer = ByteBuffer.allocate(500);
  test.send(buffer);
  buffer.rewind();
  test2.send(buffer);
  System.out.println("Data sent...");
  Thread.sleep(5000);
  UDPSocket.shutdown();
 }</p>

<p>@Override
 public void onData(ByteBuffer data) 
 {
  System.out.println("Received " + data.limit() + " bytes of data.");
 }
}

Ответы [ 2 ]

3 голосов
/ 22 июля 2015

Я столкнулся с той же проблемой сегодня (то есть "wakeupAndRegister" недоступен). Я надеюсь, что мое решение может быть полезным:

Создать объект синхронизации:

Object registeringSync = new Object();

Зарегистрируйте канал, выполнив:

synchronized (registeringSync) {
  selector.wakeup();  // Wakes up a CURRENT or (important) NEXT select
  // !!! Might run into a deadlock "between" these lines if not using the lock !!!
  // To force it, insert Thread.sleep(1000); here
  channel.register(selector, ...);
}

Тема должна сделать следующее:

public void run() {    
  while (initialized) {
    if (selector.select() != 0) {  // Blocks until "wakeup"
      // Iterate through selected keys
    }
    synchronized (registeringSync) { }  // Cannot continue until "register" is complete
  }
}
3 голосов
/ 07 июля 2010

Селектор имеет несколько задокументированных уровней внутренней синхронизации, и вы сталкиваетесь с ними всеми.Вызовите wakeup() на селекторе, прежде чем вызывать register().. Убедитесь, что цикл select() работает правильно, если нет выбранных клавиш, что и происходит на wakeup().

...