Java nicht blockierender E / A-Selektor, der bewirkt, dass das Kanalregister @ blockie

Ich habe zwei Threads, die ich mit Java NIO für nicht blockierende Sockets beschäftige. Das machen die Threads:

Thread 1: Eine Schleife, die die select () -Methode eines Selektors aufruft. Wenn Schlüssel verfügbar sind, werden sie entsprechend verarbeitet.

Thread 2: Registriert gelegentlich einen SocketChannel beim Selektor, indem register () aufgerufen wird.

Das Problem ist, dass der Aufruf von register () auf unbestimmte Zeit blockiert wird, es sei denn, die Zeitüberschreitung für select () ist sehr gering (etwa 100 ms). Obwohl der Kanal so konfiguriert ist, dass er nicht blockiert, und die Javadocs angeben, dass das Selector-Objekt threadsicher ist (die Auswahlschlüssel sind es jedoch nicht, wie ich weiß).

Also hat jemand eine Idee woran das liegen könnte? Die Anwendung funktioniert einwandfrei, wenn ich alles in einen Thread stecke. Dann treten keine Probleme auf, aber ich hätte wirklich gerne separate Threads. Jede Hilfe wird geschätzt. Ich habe meinen Beispielcode unten gepostet:

Ändern Sie die Auswahl (1000), um (100) auszuwählen, und es wird funktionieren. Belassen Sie es als select () oder select (1000) und es wird nicht.


import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;<p></p>

<p>public class UDPSocket 
{
 private DatagramChannel clientChannel;
 private String dstHost;
 private int dstPort;
 private static Selector recvSelector;
 private static volatile boolean initialized;
 private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();</p>

<p>public static void init()
 {
  initialized = true;</p>

<p>try 
  {
   recvSelector = Selector.open();
  } 
  catch (IOException e) 
  {
   System.err.println(e);
  }</p>

<p>Thread t = new Thread(new Runnable()
  {
   @Override
   public void run() 
   {
    while(initialized)
    {
     readData();
     Thread.yield();
    }
   }<br>
  });
  t.start();
 }</p>

<p>public static void shutdown()
 {
  initialized = false;
 }</p>

<p>private static void readData()
 {
  try
  {
   int numKeys = recvSelector.select(1000);</p>

<p>if (numKeys > 0)
   {
    Iterator i = recvSelector.selectedKeys().iterator();</p>

<pre><code>while(i.hasNext())
{
 SelectionKey key = i.next();
 i.remove();

 if (key.isValid() && key.isReadable())
 {
  DatagramChannel channel = (DatagramChannel) key.channel();

  // allocate every time we receive so that it's a copy that won't get erased
  final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
  channel.receive(buffer);
  buffer.flip();
  final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();

  // let user handle event on a dedicated thread
  eventQueue.execute(new Runnable()
  {
   @Override
   public void run() 
   {
    subscriber.onData(buffer);
   }       
  });
 }
}
</code></pre>

<p>}
  }
  catch (IOException e)
  {
   System.err.println(e);
  }<br>
 }</p>

<p>public UDPSocket(String dstHost, int dstPort)
 {
  try
  {
   this.dstHost = dstHost;
   this.dstPort = dstPort;
   clientChannel = DatagramChannel.open();
   clientChannel.configureBlocking(false);
  }
  catch (IOException e)
  {
   System.err.println(e);
  }
 }</p>

<p>public void addListener(SocketSubscriber subscriber)
 {
  try 
  {
   DatagramChannel serverChannel = DatagramChannel.open();
   serverChannel.configureBlocking(false);
   DatagramSocket socket = serverChannel.socket();
   socket.bind(new InetSocketAddress(dstPort));
   SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ);
   key.attach(subscriber);
  } 
  catch (IOException e) 
  {
   System.err.println(e);
  }
 }</p>

<p>public void send(ByteBuffer buffer)
 {
  try 
  {
   clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort));
  } 
  catch (IOException e) 
  {
   System.err.println(e);
  }
 }</p>

public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }


import java.nio.ByteBuffer;<p></p>

public interface SocketSubscriber { public void onData(ByteBuffer data); }

Example usage:


public class Test implements SocketSubscriber
{
 public static void main(String[] args) throws Exception
 {
  UDPSocket.init();
  UDPSocket test = new UDPSocket("localhost", 1234);
  test.addListener(new Test());
  UDPSocket test2 = new UDPSocket("localhost", 4321);
  test2.addListener(new Test());
  System.out.println("Listening...");
  ByteBuffer buffer = ByteBuffer.allocate(500);
  test.send(buffer);
  buffer.rewind();
  test2.send(buffer);
  System.out.println("Data sent...");
  Thread.sleep(5000);
  UDPSocket.shutdown();
 }<p></p>

@Override public void onData(ByteBuffer data) { System.out.println("Received " + data.limit() + " bytes of data."); } }

Antworten auf die Frage(4)

Ihre Antwort auf die Frage