Java-неблокирующий селектор ввода-вывода вызывает блокировку регистра канала
У меня есть два потока, которые я имею дело с Java NIO для неблокирующих сокетов. Вот что делают потоки:
Поток 1: цикл, который вызывает метод select () селектора. Если какие-либо ключи доступны, они обрабатываются соответствующим образом.
Поток 2: Иногда регистрирует SocketChannel в селекторе, вызывая register ().
Проблема в том, что, если время ожидания для select () не очень мало (например, около 100 мс), вызов register () будет блокироваться бесконечно. Несмотря на то, что канал настроен как неблокирующий, а в javadocs говорится, что объект Selector является поточно-ориентированным (но я знаю, что его клавиши выбора не безопасны).
Таким образом, у кого-нибудь есть какие-либо идеи о том, что может быть проблема? Приложение работает отлично, если я положу все в один поток. Тогда проблем не возникает, но я бы очень хотел иметь отдельные темы. Любая помощь приветствуется. Я разместил мой пример кода ниже:
Измените выбор (1000), чтобы выбрать (100), и это будет работать. Оставьте его как select () или select (1000), и он не будет.
import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;<p></p> <p>public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();</p> <p>public static void init() { initialized = true;</p> <p>try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); }</p> <p>Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }<br> }); t.start(); }</p> <p>public static void shutdown() { initialized = false; }</p> <p>private static void readData() { try { int numKeys = recvSelector.select(1000);</p> <p>if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator();</p> <pre><code>while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it's a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } </code></pre> <p>} } catch (IOException e) { System.err.println(e); }<br> }</p> <p>public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } }</p> <p>public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } }</p> <p>public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } }</p>
public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }
import java.nio.ByteBuffer;<p></p>
public interface SocketSubscriber { public void onData(ByteBuffer data); }
Example usage:
public class Test implements SocketSubscriber { public static void main(String[] args) throws Exception { UDPSocket.init(); UDPSocket test = new UDPSocket("localhost", 1234); test.addListener(new Test()); UDPSocket test2 = new UDPSocket("localhost", 4321); test2.addListener(new Test()); System.out.println("Listening..."); ByteBuffer buffer = ByteBuffer.allocate(500); test.send(buffer); buffer.rewind(); test2.send(buffer); System.out.println("Data sent..."); Thread.sleep(5000); UDPSocket.shutdown(); }<p></p>
@Override public void onData(ByteBuffer data) { System.out.println("Received " + data.limit() + " bytes of data."); } }