spring xd verliert Nachrichten bei der Verarbeitung eines großen Volumens

Ich verwende spring xd Mein Stream sieht wie folgt aus und führt Tests für 3-Knoten-Container mit 1 Admin-Knoten mit Kaninchen als Transport durch.

aws-s3|processor1|http-client|processor2>queue:readyQueue

Ich habe unter Tippen erstellt.

tap1  aws-s3>s3Queue


tap2  processor1>processorQueue1

tap3  http-client>httpQueue

Ich habe in meinen Tests folgende Szenarien durchlaufen:

Scenario1: 5 Dateien mit 200.000 Einträgen = 1 Million Einträge gleichzeitig mit http-client = 70 und processor2 = 30

Ich sehe 900k Nachricht s3Queue

Ich sehe 889k message processorQueue1

Ich sehe 886k Nachricht httpQueue

Ich sehe 883k Message ProcessorQueue2 Nachrichten sind überall verloren und ihre zufällige

Scenario2:

5 Dateien mit 200.000 = 1 Million Datensätzen und allen Modul-Parallelitäten = 1

Ich sehe 998800 Nachricht s3Queue

Ich sehe 998760 Nachricht processorQueue1

Ich sehe 997540 Nachricht httpQueue

Ich sehe 997530 Nachricht processorQueue2

Auch diese Zahl ist zufällig und nicht konsistent

Scenario3

Ich habe den Stream wie folgt geändert und Concurrency = 1 und 5 Dateien mit 200.000 = 1 Million Datensätzen.

aws-s3 >testQueue

Ich erhalte alle meine Nachrichten, die ich dreimal und ohne Probleme ausführe. Ich erhalte alle meine 1 Million Nachrichten

scenario4

Ich habe den Stream wie folgt geändert und Concurrency = 1 5 Dateien mit 200.000 = 1 Million Datensätzen

aws-s3 |processor1 >testQueue2

Ich erhalte alle meine Nachrichten, die ich dreimal und ohne Probleme ausführe. Ich erhalte alle meine 1 Million Nachrichten

In Szenario 4 und Szenario 3 ist die Datenaufnahme schneller und es dauerte 5 Minuten, um 5 Millionen schneller zu verarbeiten, und die Aufnahme war in der Kaninchen-Transportwarteschlange schneller, wie z. B. 5 KBit / s.

In Szenario 1 war die Datenaufnahme langsamer, obwohl das S3-Modul die Daten sehr langsam abrief, wie z. B. 300 bis 1000 msg pro Sekunde.

In Szenario 2 hat s3 Daten schneller abgerufen, aber der http-Client war langsam wie 100 msg pro Sekunde, aber aws-s3 hat Daten schnell wie 3-4k msg pro Sekunde abgerufen.

Ich denke, dass das XD-Threading Probleme verursacht und ich Nachrichten verliere. Bitte helfen Sie mir, dieses Problem zu lösen.

aktualisiere

Scenario 5 

Ich habe mich verändertreply-timeout zu-1 in http client und dann habe ich nur 37 msgs @ verlor

Jetzt wieder starte ich die 2. Iteration. Ich habe 25000 Nachrichten verloren. Ich sehe das brüllende Containerprotokoll, als das passierte.

2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
        at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
        at org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)
        at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
        at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
        at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
        at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
        at com.sun.proxy.$Proxy1537.send(Unknown Source)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
        at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        ... 93 more
Caused by: java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
        at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)
        at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)
        ... 112 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
        ... 116 more
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364
        at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
        ... 1 more

2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)


2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
~                                                                                                                                                                                 


2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

Aktualisier

Ich habe festgestellt, dass das Problem bei Nachrichtenverlusten auftritt, wenn diese Ausnahme auftritt. Ich sehe viele verlorene Nachrichten. Dieses Muster habe ich mehrmals getestet. Jedes Mal, wenn diese Ausnahme auftritt, sehe ich, dass Nachrichten verloren gehe

2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

rabbit configuration

spring:
  rabbitmq:
   addresses: host1:5672,host2:5672,host3:5672
   adminAddresses: http://host1:15672,http://host2:15672,http://host3:15672
   nodes: [email protected],[email protected],[email protected]
   username: test
   password: test
   virtual_host: /
   useSSL: false
   sslProperties:

aktualisiert mit zunehmender Cache-Größe auf 200

Ich habe die von Ihnen bereitgestellte XML-Datei hinzugefügt und die Cachegröße auf 200 erhöht. Dies geschieht beim Verarbeiten von 1 Million und 80.000 Nachrichten. Nur meine HTTP-Client-Nebenläufigkeit ist 100, alle anderen sind 1. Client-Warteschlange und gleiche Anzahl. Aber die Anzahl der Nachrichten in meinem benannten Kanal steigt langsam wie 10 Nachrichten pro Minute, aber die sehr langsame s3-poller | Prozessor | http-Client> Warteschlange: batchCacheQueue

Msg wird vor http 186174 nicht in die Warteschlange eingereih

u simulierender Testfall:

1) Ich habe die Federintegration aws-s3 mit einem Splitter im Composite-Modul | verwendet Prozessor wie XML-Parsing | http-Client mit Parallelität 100> benanntem Kanal.

2) Ich denke, die Dateiquelle könnte auch funktionieren. Erstellen Sie eine einzelne Datei mit Millionen Datensätzen und versuchen Sie, diese aus der Datei zu ziehen.

3) Nach 4 bis 5 Durchläufen tritt diese Ausnahme auf

Antworten auf die Frage(2)

Ihre Antwort auf die Frage