java.nio Selectores y SocketChannel para la transmisión continua

Actualmente estoy usando java.nio.channel.Selectors & SocketChannels para una aplicación que abrirá conexiones de uno a muchos para continuar la transmisión a un servidor. Tengo tres subprocesos para mi aplicación: StreamWriteWorker - realiza la operación de escritura en el SocketChannel, StreamReadWorker - lee bytes desde el contenido del búfer y el análisis, y StreamTaskDispatcher - realiza la selección del Selector para readyOps y envía nuevos runnables para los subprocesos del trabajador.

Problema: la invocación en el método de selección del Selector solo devuelve un valor> 0 (readyOps válidos) en la primera invocación; Puedo realizar una escritura y enviar datos en todos los canales listos una vez, pero todas las siguientes invocaciones del método de selección del Selector devuelven 0.

Pregunta: ¿Debo invocar de cerca en el SocketChannel después de cada lectura / escritura (espero que no)? De no ser así, ¿cuál podría ser la causa de que los SocketChannels no estén disponibles para operaciones de lectura / escritura?

Lo siento, no puedo publicar el código, pero espero haber explicado el problema con la suficiente claridad para que alguien pueda ayudar. He buscado respuestas y veo que no puede reutilizar una conexión SocketChannel después de que se cierre, pero mi canal no debe estar cerca, el servidor nunca recibe el resultado de la transmisión EOF.

Hice algunos progresos y descubrí que la operación de escritura no se estaba produciendo en la aplicación del servidor debido a un error de análisis de json. Así que ahora mi SocketChannel en el código de la aplicación cliente se prepara para otra operación de escritura después de procesar una operación de lectura. Supongo que esta es la naturaleza TCP de SocketChannels. Sin embargo, el SocketChannel no está disponible para otra operación de lectura en el lado de la aplicación del servidor. ¿Es este el comportamiento normal de SocketChannels? ¿Debo cerrar la conexión en el lado del cliente después de la operación de lectura y establecer una nueva conexión?

Aquí hay un ejemplo de código de lo que estoy tratando de hacer:

package org.stream.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang3.RandomStringUtils;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.stream.JsonToken;

public class ClientServerTest {

    private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>();
    private ExecutorService executor = Executors.newFixedThreadPool(1);
    private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>();

    private class StreamWriteTask implements Runnable {
        private ByteBuffer buffer;
        private SelectionKey key;
        private Selector selector;

        private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
            this.buffer = buffer;
            this.key = key;
            this.selector = selector;
        }

        @Override
        public void run() {
            SocketChannel sc = (SocketChannel) key.channel();
            byte[] data = (byte[]) key.attachment();
            buffer.clear();
            buffer.put(data);
            buffer.flip();
            int results = 0;
            while (buffer.hasRemaining()) {
                try {
                    results = sc.write(buffer);
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                if (results == 0) {
                    buffer.compact();
                    buffer.flip();
                    data = new byte[buffer.remaining()];
                    buffer.get(data);
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.attach(data);
                    selector.wakeup();
                    return;
                }
            }

            key.interestOps(SelectionKey.OP_READ);
            key.attach(null);
            selector.wakeup();
        }

    }

    private class StreamReadTask implements Runnable {
        private ByteBuffer buffer;
        private SelectionKey key;
        private Selector selector;

        private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
            this.buffer = buffer;
            this.key = key;
            this.selector = selector;
        }

        private boolean checkUUID(byte[] data) {
            return uuidToSize.containsKey(new String(data));
        }

        @Override
        public void run() {
            SocketChannel sc = (SocketChannel) key.channel();
            buffer.clear();
            byte[] data = (byte[]) key.attachment();
            if (data != null) {
                buffer.put(data);
            }
            int count = 0;
            int readAttempts = 0;
            try {
                while ((count = sc.read(buffer)) > 0) {
                    readAttempts++;
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if (count == 0) {
                buffer.flip();
                data = new byte[buffer.limit()];
                buffer.get(data);
                if (checkUUID(data)) {
                    key.interestOps(SelectionKey.OP_READ);
                    key.attach(data);
                } else {
                    System.out.println("Clinet Read - uuid ~~~~ " + new String(data));
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.attach(null);
                }
            }

            if (count == -1) {
                try {
                    sc.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            selector.wakeup();
        }

    }

    private class ClientWorker implements Runnable {

        @Override
        public void run() {
            try {
                Selector selector = Selector.open();
                SocketChannel sc = SocketChannel.open();
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress("127.0.0.1", 9001));
                sc.register(selector, SelectionKey.OP_CONNECT);
                ByteBuffer buffer = ByteBuffer.allocateDirect(65535);

                while (selector.isOpen()) {
                    int count = selector.select(10);

                    if (count == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                    while (it.hasNext()) {
                        final SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid()) {
                            continue;
                        }

                        if (key.isConnectable()) {
                            sc = (SocketChannel) key.channel();
                            if (!sc.finishConnect()) {
                                continue;
                            }
                            sc.register(selector, SelectionKey.OP_WRITE);
                        }

                        if (key.isReadable()) {
                            key.interestOps(0);
                            executor.execute(new StreamReadTask(buffer, key, selector));
                        }
                        if (key.isWritable()) {
                            key.interestOps(0);
                            if(key.attachment() == null){
                                key.attach(dataQueue.take());
                            }
                            executor.execute(new StreamWriteTask(buffer, key, selector));
                        }
                    }
                }
            } catch (IOException ex) {
                // Handle Exception
            }catch(InterruptedException ex){

            }

        }
    }

    private class ServerWorker implements Runnable {
        @Override
        public void run() {
            try {
                Selector selector = Selector.open();
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ServerSocket socket = ssc.socket();
                socket.bind(new InetSocketAddress(9001));
                ssc.configureBlocking(false);
                ssc.register(selector, SelectionKey.OP_ACCEPT);
                ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
                DataHandler handler = new DataHandler();

                while (selector.isOpen()) {
                    int count = selector.select(10);

                    if (count == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                    while (it.hasNext()) {
                        final SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid()) {
                            continue;
                        }

                        if (key.isAcceptable()) {
                            ssc = (ServerSocketChannel) key.channel();
                            SocketChannel sc = ssc.accept();
                            sc.configureBlocking(false);
                            sc.register(selector, SelectionKey.OP_READ);
                        }
                        if (key.isReadable()) {
                            handler.readSocket(buffer, key);
                        }
                        if (key.isWritable()) {
                            handler.writeToSocket(buffer, key);
                        }
                    }
                }

            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    private class DataHandler {

        private JsonObject parseData(StringBuilder builder) {
            if (!builder.toString().endsWith("}")) {
                return null;
            }

            JsonParser parser = new JsonParser();
            JsonObject obj = (JsonObject) parser.parse(builder.toString());
            return obj;
        }

        private void readSocket(ByteBuffer buffer, SelectionKey key)
                throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            buffer.clear();
            int count = Integer.MAX_VALUE;
            int readAttempts = 0;
            try {
                while ((count = sc.read(buffer)) > 0) {
                    readAttempts++;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

            if (count == 0) {
                buffer.flip();
                StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key
                        .attachment() : new StringBuilder();
                Charset charset = Charset.forName("UTF-8");
                CharsetDecoder decoder = charset.newDecoder();
                decoder.onMalformedInput(CodingErrorAction.IGNORE);
                System.out.println(buffer);
                CharBuffer charBuffer = decoder.decode(buffer);
                String content = charBuffer.toString();
                charBuffer = null;
                builder.append(content);    
                System.out.println(content);
                JsonObject obj = parseData(builder);
                if (obj == null) {
                    key.attach(builder);
                    key.interestOps(SelectionKey.OP_READ);
                } else {
                    System.out.println("data ~~~~~~~ " + builder.toString());
                    JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive();
                    key.attach(uuid.toString().getBytes());
                    key.interestOps(SelectionKey.OP_WRITE);
                }
            }

            if (count == -1) {
                key.attach(null);
                sc.close();
            }
        }

        private void writeToSocket(ByteBuffer buffer, SelectionKey key)
                throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            byte[] data = (byte[]) key.attachment();
            buffer.clear();
            buffer.put(data);
            buffer.flip();
            int writeAttempts = 0;
            while (buffer.hasRemaining()) {
                int results = sc.write(buffer);
                writeAttempts++;
                System.out.println("Write Attempt #" + writeAttempts);
                if (results == 0) {
                    buffer.compact();
                    buffer.flip();
                    data = new byte[buffer.remaining()];
                    buffer.get(data);
                    key.attach(data);
                    key.interestOps(SelectionKey.OP_WRITE);
                    break;
                }
            }

            key.interestOps(SelectionKey.OP_READ);
            key.attach(null);
        }
    }

    public ClientServerTest() {
        for (int index = 0; index < 1000; index++) {
            JsonObject obj = new JsonObject();
            String uuid = UUID.randomUUID().toString();
            uuidToSize.put(uuid, uuid.length());
            obj.addProperty("uuid", uuid);
            String data = RandomStringUtils.randomAlphanumeric(10000);
            obj.addProperty("event", data);
            dataQueue.add(obj.toString().getBytes());
        }

        Thread serverWorker = new Thread(new ServerWorker());
        serverWorker.start();

        Thread clientWorker = new Thread(new ClientWorker());
        clientWorker.start();

    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ClientServerTest test = new ClientServerTest();
        for(;;){

        }
    }

}

Respuestas a la pregunta(1)

Su respuesta a la pregunta