Integração Spring - TCP confiável para aplicações de alto volume
Estou usando o Spring Integration para servidor TCP, que mantém conexões com alguns milhares de clientes. Eu preciso do servidor para estrangular os clientes em caso de carga excessiva e não perder mensagens.
Minha configuração de servidor:
<task:executor id="myTaskExecutor"
pool-size="4-8"
queue-capacity="0"
rejection-policy="CALLER_RUNS" />
<int-ip:tcp-connection-factory id="serverTcpConFact"
type="server"
port="60000"
using-nio="true"
single-use="false"
so-timeout="300000"
task-executor="myTaskExecutor" />
<int-ip:tcp-inbound-channel-adapter id="tcpInboundAdapter"
channel="tcpInbound"
connection-factory="serverTcpConFact" />
<channel id="tcpInbound" />
<service-activator input-channel="tcpInbound"
ref="myService"
method="test" />
<beans:bean id="myService" class="org.test.tcpserver.MyService" />
Como o executor de tarefas padrão para o connection factory é ilimitado, eu uso um executor de tarefas em pool para evitar erros de falta de memória.
Um cliente simples para teste de carga:
public class TCPClientTest {
static Socket socket;
static List<Socket> sl = new ArrayList<>();
static DataOutputStream out;
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10000; i++) {
socket = new Socket("localhost", 60000);
sl.add(socket);
out = new DataOutputStream(socket.getOutputStream());
out.writeBytes("connection " + i + "\r\n");
System.out.println("Using connection #" + i);
}
System.in.read();
}
}
Quando eu o executo, o servidor recebe apenas de 10 a 20 mensagens e o cliente recebe a exceção "Conexão recusada: conectar". Depois disso, o servidor não pode mais aceitar novas conexões, mesmo após o tempo limite da conexão. Aumentar o tamanho do pool apenas ajuda a receber um pouco mais de mensagens.
EDITAR
Estou usando o Spring Integration 3.0.2.RELEASE. Para produção, estou usando de 8 a 40 threads, mas só faz esse teste falhar mais tarde, depois de várias centenas de conexões.
MyService.test () não faz muito ...
public class MyService {
public void test(byte[] input) {
System.out.println("Received: " + new String(input));
}
}