spring xd perdendo mensagens ao processar um volume enorme

Estou usando o spring xd Meu stream é como abaixo e executando testes no contêiner de 3 nós com 1 nó de administrador com coelho como transporte

aws-s3|processor1|http-client|processor2>queue:readyQueue

Eu criei abaixo da torneira.

tap1  aws-s3>s3Queue


tap2  processor1>processorQueue1

tap3  http-client>httpQueue

Eu corro abaixo dos cenários em meus testes:

Scenario1: 5 arquivos de 200k = 1 milhão de registros simultâneos de http-client = 70 e processador2 = 30

Eu vejo a mensagem de 900k s3Queue

Eu vejo o processador de mensagens de 889kQueue1

Eu vejo a mensagem 886k httpQueue

Eu vejo o processador de mensagens 883kQueue2 As mensagens são perdidas em todos os lugares e são aleatórias

Scenario2:

5 arquivos de 200k = 1 milhão de registros e todos os módulos simultâneos = 1

Eu vejo a mensagem 998800 s3Queue

Eu vejo o processador de mensagens 998760Queue1

Eu vejo a mensagem 997540 httpQueue

Eu vejo o processador de mensagens 997530Queue2

Mesmo esse número é aleatório e não consistente

Scenario3

Alterei o fluxo como abaixo e simultaneidade = 1 e 5 arquivos de 200k = 1 milhão de registros

aws-s3 >testQueue

Recebo todas as minhas mensagens, corro 3 vezes e sem problemas. Recebo todas as minhas 1 milhão de mensagens

scenario4

Alterei o fluxo como abaixo e simultaneidade = 1 5 arquivos de 200k = 1 milhão de registros

aws-s3 |processor1 >testQueue2

Recebo todas as minhas mensagens, corro 3 vezes e sem problemas. Recebo todas as minhas 1 milhão de mensagens

No cenário 4 e no cenário 3, a ingestão de dados é mais rápida e foram necessários 5 minutos para processar 5 milhões mais rapidamente e a ingestão foi mais rápida na fila de transporte de coelhos, como 5k msg por segundo

No cenário 1, a ingestão de dados era mais lenta, mesmo o módulo s3 estava puxando os dados muito lentamente, como 300 a 1000 msg por segundo

No cenário 2, o s3 extraiu os dados mais rapidamente, mas o cliente http era lento como 100 msg por segundo, mas o aws-s3 extraiu os dados rapidamente como 3-4k msg por segundo.

Estou pensando que ver o xd threading está causando problemas e estou perdendo messages.Please você pode me ajudar a resolver este problema.

atualizar

Scenario 5 

eu mudeireply-timeout para-1 no cliente http e então eu perdi apenas 37 msgs

Agora, novamente, eu executo a segunda iteração. Perdi 25000 msgs. Eu vejo o seguinte recipiente de log quando isso aconteceu

2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
        at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
        at org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)
        at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
        at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
        at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
        at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
        at com.sun.proxy.$Proxy1537.send(Unknown Source)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
        at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        ... 93 more
Caused by: java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
        at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)
        at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)
        ... 112 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
        ... 116 more
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364
        at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
        ... 1 more

2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)


2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
~                                                                                                                                                                                 


2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

Atualizada

Se a resposta ajudou de alguma forma, por favor, marque como resposta, caso a sua dúvida não tenha sido solucionada, por favor, poste novamente.

2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

configuração de coelho

spring:
  rabbitmq:
   addresses: host1:5672,host2:5672,host3:5672
   adminAddresses: http://host1:15672,http://host2:15672,http://host3:15672
   nodes: [email protected],[email protected],[email protected]
   username: test
   password: test
   virtual_host: /
   useSSL: false
   sslProperties:

atualizado com o aumento do tamanho do cache para 200

Eu adicionei o xml fornecido por você e aumentou o tamanho do cache para 200. Isso é o que acontece ao processar mensagens de 1 milhão e 80 k. Somente a simultaneidade do cliente http é 100 e todas as outras são 1. mas a contagem de msg no meu canal nomeado aumenta lentamente como 10 msg por minuto, mas seu processador s3 muito lento | processador | http | client> fila: batchCacheQueue

A mensagem não está diminuindo na fila antes de http 186174. Mas lentamente a mensagem está chegando ao batchCacheQueue

Caso de teste para simular:

1) Eu estava usando a fonte aws-s3 de integração de mola com um divisor no módulo composto | processador como análise xml | cliente http com simultaneidade 100> canal nomeado.

2) Acho que a fonte do arquivo também pode funcionar. Crie um único arquivo de um milhão de registros e tente extrair isso do arquivo.

3) Após algumas rodadas de 4 a 5, vemos essa exceção acontecendo

questionAnswers(1)

yourAnswerToTheQuestion