spring xd verliert Nachrichten bei der Verarbeitung eines großen Volumens
Ich verwende spring xd Mein Stream sieht wie folgt aus und führt Tests für 3-Knoten-Container mit 1 Admin-Knoten mit Kaninchen als Transport durch.
aws-s3|processor1|http-client|processor2>queue:readyQueue
Ich habe unter Tippen erstellt.
tap1 aws-s3>s3Queue
tap2 processor1>processorQueue1
tap3 http-client>httpQueue
Ich habe in meinen Tests folgende Szenarien durchlaufen:
Scenario1
: 5 Dateien mit 200.000 Einträgen = 1 Million Einträge gleichzeitig mit http-client = 70 und processor2 = 30
Ich sehe 900k Nachricht s3Queue
Ich sehe 889k message processorQueue1
Ich sehe 886k Nachricht httpQueue
Ich sehe 883k Message ProcessorQueue2 Nachrichten sind überall verloren und ihre zufällige
Scenario2:
5 Dateien mit 200.000 = 1 Million Datensätzen und allen Modul-Parallelitäten = 1
Ich sehe 998800 Nachricht s3Queue
Ich sehe 998760 Nachricht processorQueue1
Ich sehe 997540 Nachricht httpQueue
Ich sehe 997530 Nachricht processorQueue2
Auch diese Zahl ist zufällig und nicht konsistent
Scenario3
Ich habe den Stream wie folgt geändert und Concurrency = 1 und 5 Dateien mit 200.000 = 1 Million Datensätzen.
aws-s3 >testQueue
Ich erhalte alle meine Nachrichten, die ich dreimal und ohne Probleme ausführe. Ich erhalte alle meine 1 Million Nachrichten
scenario4
Ich habe den Stream wie folgt geändert und Concurrency = 1 5 Dateien mit 200.000 = 1 Million Datensätzen
aws-s3 |processor1 >testQueue2
Ich erhalte alle meine Nachrichten, die ich dreimal und ohne Probleme ausführe. Ich erhalte alle meine 1 Million Nachrichten
In Szenario 4 und Szenario 3 ist die Datenaufnahme schneller und es dauerte 5 Minuten, um 5 Millionen schneller zu verarbeiten, und die Aufnahme war in der Kaninchen-Transportwarteschlange schneller, wie z. B. 5 KBit / s.
In Szenario 1 war die Datenaufnahme langsamer, obwohl das S3-Modul die Daten sehr langsam abrief, wie z. B. 300 bis 1000 msg pro Sekunde.
In Szenario 2 hat s3 Daten schneller abgerufen, aber der http-Client war langsam wie 100 msg pro Sekunde, aber aws-s3 hat Daten schnell wie 3-4k msg pro Sekunde abgerufen.
Ich denke, dass das XD-Threading Probleme verursacht und ich Nachrichten verliere. Bitte helfen Sie mir, dieses Problem zu lösen.
aktualisiere
Scenario 5
Ich habe mich verändertreply-timeout
zu-1
in http client und dann habe ich nur 37 msgs @ verlor
Jetzt wieder starte ich die 2. Iteration. Ich habe 25000 Nachrichten verloren. Ich sehe das brüllende Containerprotokoll, als das passierte.
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy1537.send(Unknown Source)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 93 more
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)
at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)
... 112 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 116 more
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364
at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
... 1 more
2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
~
2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
Aktualisier
Ich habe festgestellt, dass das Problem bei Nachrichtenverlusten auftritt, wenn diese Ausnahme auftritt. Ich sehe viele verlorene Nachrichten. Dieses Muster habe ich mehrmals getestet. Jedes Mal, wenn diese Ausnahme auftritt, sehe ich, dass Nachrichten verloren gehe
2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
rabbit configuration
spring:
rabbitmq:
addresses: host1:5672,host2:5672,host3:5672
adminAddresses: http://host1:15672,http://host2:15672,http://host3:15672
nodes: [email protected],[email protected],[email protected]
username: test
password: test
virtual_host: /
useSSL: false
sslProperties:
aktualisiert mit zunehmender Cache-Größe auf 200
Ich habe die von Ihnen bereitgestellte XML-Datei hinzugefügt und die Cachegröße auf 200 erhöht. Dies geschieht beim Verarbeiten von 1 Million und 80.000 Nachrichten. Nur meine HTTP-Client-Nebenläufigkeit ist 100, alle anderen sind 1. Client-Warteschlange und gleiche Anzahl. Aber die Anzahl der Nachrichten in meinem benannten Kanal steigt langsam wie 10 Nachrichten pro Minute, aber die sehr langsame s3-poller | Prozessor | http-Client> Warteschlange: batchCacheQueue
Msg wird vor http 186174 nicht in die Warteschlange eingereih
u simulierender Testfall:
1) Ich habe die Federintegration aws-s3 mit einem Splitter im Composite-Modul | verwendet Prozessor wie XML-Parsing | http-Client mit Parallelität 100> benanntem Kanal.
2) Ich denke, die Dateiquelle könnte auch funktionieren. Erstellen Sie eine einzelne Datei mit Millionen Datensätzen und versuchen Sie, diese aus der Datei zu ziehen.
3) Nach 4 bis 5 Durchläufen tritt diese Ausnahme auf