У меня есть два потока, которые я имею дело с Java NIO для неблокирующих сокетов. Вот что делают потоки:
Тема 1:
Цикл, который вызывает метод select () селектора. Если какие-либо ключи доступны, они обрабатываются соответствующим образом.
Тема 2:
Иногда регистрирует SocketChannel для селектора, вызывая register ().
Проблема в том, что, если время ожидания для select () не очень мало (около 100 мс), вызов register () будет блокироваться бесконечно. Несмотря на то, что канал настроен как неблокирующий, а в javadocs говорится, что объект Selector является поточно-ориентированным (но я знаю, что его ключи выбора не являются).
Значит, у кого-нибудь есть идеи по поводу этой проблемы? Приложение работает отлично, если я положу все в один поток. Тогда проблем не возникает, но я бы очень хотел иметь отдельные темы. Любая помощь приветствуется. Я разместил мой пример кода ниже:
Измените выбор (1000), чтобы выбрать (100), и он будет работать. Оставьте его как select () или select (1000), и он не будет.
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;</p>
<p>public class UDPSocket
{
private DatagramChannel clientChannel;
private String dstHost;
private int dstPort;
private static Selector recvSelector;
private static volatile boolean initialized;
private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();</p>
<p>public static void init()
{
initialized = true;</p>
<p>try
{
recvSelector = Selector.open();
}
catch (IOException e)
{
System.err.println(e);
}</p>
<p>Thread t = new Thread(new Runnable()
{
@Override
public void run()
{
while(initialized)
{
readData();
Thread.yield();
}
}<br>
});
t.start();
}</p>
<p>public static void shutdown()
{
initialized = false;
}</p>
<p>private static void readData()
{
try
{
int numKeys = recvSelector.select(1000);</p>
<p>if (numKeys > 0)
{
Iterator i = recvSelector.selectedKeys().iterator();</p>
<pre><code>while(i.hasNext())
{
SelectionKey key = i.next();
i.remove();
if (key.isValid() && key.isReadable())
{
DatagramChannel channel = (DatagramChannel) key.channel();
// allocate every time we receive so that it's a copy that won't get erased
final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
channel.receive(buffer);
buffer.flip();
final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();
// let user handle event on a dedicated thread
eventQueue.execute(new Runnable()
{
@Override
public void run()
{
subscriber.onData(buffer);
}
});
}
}
}
}
поймать (IOException e)
{
System.err.println (е);
}
}
публичный UDPSocket (String dstHost, int dstPort)
{
пытаться
{
this.dstHost = dstHost;
this.dstPort = dstPort;
clientChannel = DatagramChannel.open ();
clientChannel.configureBlocking (ложь);
}
поймать (IOException e)
{
System.err.println (е);
}
}
public void addListener (подписчик SocketSubscriber)
{
пытаться
{
DatagramChannel serverChannel = DatagramChannel.open ();
serverChannel.configureBlocking (ложь);
DatagramSocket socket = serverChannel.socket ();
socket.bind (новый InetSocketAddress (dstPort));
SelectionKey key = serverChannel.register (recvSelector, SelectionKey.OP_READ);
key.attach (абонент);
}
поймать (IOException e)
{
System.err.println (е);
}
}
public void send (буфер ByteBuffer)
{
пытаться
{
clientChannel.send (буфер, новый InetSocketAddress (dstHost, dstPort));
}
поймать (IOException e)
{
System.err.println (е);
}
}
public void close ()
{
пытаться
{
clientChannel.close ();
}
поймать (IOException e)
{
System.err.println (е);
}
}
}
<code>
import java.nio.ByteBuffer;</p>
<p>public interface SocketSubscriber
{
public void onData(ByteBuffer data);
}
Пример использования:
<code>
public class Test implements SocketSubscriber
{
public static void main(String[] args) throws Exception
{
UDPSocket.init();
UDPSocket test = new UDPSocket("localhost", 1234);
test.addListener(new Test());
UDPSocket test2 = new UDPSocket("localhost", 4321);
test2.addListener(new Test());
System.out.println("Listening...");
ByteBuffer buffer = ByteBuffer.allocate(500);
test.send(buffer);
buffer.rewind();
test2.send(buffer);
System.out.println("Data sent...");
Thread.sleep(5000);
UDPSocket.shutdown();
}</p>
<p>@Override
public void onData(ByteBuffer data)
{
System.out.println("Received " + data.limit() + " bytes of data.");
}
}