Integração Spring - TCP confiável para aplicações de alto volume

Estou usando o Spring Integration para servidor TCP, que mantém conexões com alguns milhares de clientes. Eu preciso do servidor para estrangular os clientes em caso de carga excessiva e não perder mensagens.

Minha configuração de servidor:

<task:executor id="myTaskExecutor"
    pool-size="4-8"
    queue-capacity="0"
    rejection-policy="CALLER_RUNS" />

<int-ip:tcp-connection-factory id="serverTcpConFact"
    type="server"
    port="60000"
    using-nio="true"
    single-use="false"
    so-timeout="300000"
    task-executor="myTaskExecutor" />

<int-ip:tcp-inbound-channel-adapter id="tcpInboundAdapter"
    channel="tcpInbound"
    connection-factory="serverTcpConFact" />

<channel id="tcpInbound" />

<service-activator input-channel="tcpInbound"
    ref="myService"
    method="test" />

<beans:bean id="myService" class="org.test.tcpserver.MyService" />

Como o executor de tarefas padrão para o connection factory é ilimitado, eu uso um executor de tarefas em pool para evitar erros de falta de memória.

Um cliente simples para teste de carga:

public class TCPClientTest {
    static Socket socket;
    static List<Socket> sl = new ArrayList<>();
    static DataOutputStream out;

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10000; i++) {
            socket = new Socket("localhost", 60000);
            sl.add(socket);
            out = new DataOutputStream(socket.getOutputStream());
            out.writeBytes("connection " + i + "\r\n");
            System.out.println("Using connection #" + i);
        }
        System.in.read();
    }
}

Quando eu o executo, o servidor recebe apenas de 10 a 20 mensagens e o cliente recebe a exceção "Conexão recusada: conectar". Depois disso, o servidor não pode mais aceitar novas conexões, mesmo após o tempo limite da conexão. Aumentar o tamanho do pool apenas ajuda a receber um pouco mais de mensagens.

EDITAR

Estou usando o Spring Integration 3.0.2.RELEASE. Para produção, estou usando de 8 a 40 threads, mas só faz esse teste falhar mais tarde, depois de várias centenas de conexões.

MyService.test () não faz muito ...

public class MyService {
    public void test(byte[] input) {
        System.out.println("Received: " + new String(input));
    }
}

Aqui está o log com o registro no nível de rastreio.

Fontes

questionAnswers(2)

yourAnswerToTheQuestion