весна xd теряет сообщения при обработке огромного объема
Я использую Spring XD Мой поток выглядит как показано ниже и запускаю тесты на 3-х узловом контейнере с 1 админ-узлом с кроликом в качестве транспорта
aws-s3|processor1|http-client|processor2>queue:readyQueue
Я создал ниже нажмите.
tap1 aws-s3>s3Queue
tap2 processor1>processorQueue1
tap3 http-client>httpQueue
Я запускаю ниже сценарии в моих тестах:
Scenario1
: 5 файлов по 200k = 1 миллион записей одновременно http-client = 70 и processor2 = 30
Я вижу 900к сообщение s3Queue
Я вижу 889k обработчик сообщенийQueue1
Я вижу сообщение 886k httpQueue
Я вижу 883k обработчик сообщенийQueue2 Сообщения теряются везде и случайно
Scenario2:
5 файлов по 200 тыс. = 1 миллион записей и весь параллелизм модулей = 1
Я вижу 998800 сообщение s3Queue
Я вижу 998760 обработчик сообщенийQueue1
Я вижу 997540 сообщение httpQueue
Я вижу 997530 обработчик сообщенийQueue2
Даже это число является случайным и не соответствует
Scenario3
Я изменил поток, как показано ниже, и параллелизм = 1 и 5 файлов по 200k = 1 миллион записей
aws-s3 >testQueue
Я получаю все свои сообщения, которые я запускаю 3 раза, и никаких проблем. Я получаю все свои 1 миллион сообщений
scenario4
Я изменил поток, как показано ниже, и параллелизм = 1 5 файлов по 200k = 1 миллион записей
aws-s3 |processor1 >testQueue2
Я получаю все свои сообщения, которые я запускаю 3 раза, и никаких проблем. Я получаю все свои 1 миллион сообщений
В сценарии 4 и сценарии 3 загрузка данных происходит быстрее, и обработка на 5 миллионов быстрее занимает 5 минут, а загрузка в кроличьей очереди переноса была быстрее, чем 5 000 мсг / сек.
В сценарии 1 загрузка данных была медленнее, даже если модуль s3 выполнял передачу данных очень медленно, например, от 300 до 1000 мсг в секунду.
В сценарии 2 s3 извлекал данные быстрее, но http-клиент работал медленно, как 100 мсг в секунду, а aws-s3 извлекал данные быстро, как 3-4k мсг / с
Я думаю, что поток xd вызывает проблемы, и я теряю сообщения. Пожалуйста, вы можете помочь мне, как решить эту проблему.
Обновить
Scenario 5
Я изменилсяreply-timeout
в-1
в клиенте http и тогда я потерял только 37 msgs
Теперь я снова запускаю 2-ю итерацию. Я потерял 25000 сообщений. Я вижу журнал контейнеров, когда это произошло.
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy1537.send(Unknown Source)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 93 more
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)
at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)
... 112 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 116 more
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364
at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
... 1 more
2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
~
2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
обновленный
Я обнаружил, что проблема с сообщениями теряется, когда возникает это исключение, я вижу много потерянных сообщений. Этот шаблон, который я тестировал несколько раз. Каждый раз, когда возникает это исключение, я вижу потерянное сообщение. Также повышение параллелизма делает эту проблему частой.
2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
конфигурация кролика
spring:
rabbitmq:
addresses: host1:5672,host2:5672,host3:5672
adminAddresses: http://host1:15672,http://host2:15672,http://host3:15672
nodes: [email protected],[email protected],[email protected]
username: test
password: test
virtual_host: /
useSSL: false
sslProperties:
обновляется с увеличением размера кеша до 200
Я добавил xml, предоставленный вами, и увеличил размер кэша до 200. Так происходит при обработке 1 миллиона и 80 тыс. Сообщений. Только мой http-клиент с параллелизмом равен 100, все остальные - 1. Медленно остановленная обработка msg все еще существует до того, как http-client Очередь и то же количество. Но количество сообщений в моем названном канале медленно увеличивается, как 10 сообщений в минуту, но это очень медленный s3-poller | процессор | http-клиент> очередь: batchCacheQueue
Сообщение не получает снижения в очереди до http 186174. Но медленно сообщения приходят в batchCacheQueue
Тестовый пример для симуляции:
1) Я использовал пружинную интеграцию источника aws-s3 с разделителем в составном модуле | процессор, такой как xml-анализ | http-клиент с параллелизмом 100> именованный канал.
2) Я думаю, что источник файла также может работать. Создайте один файл из миллионов записей и попытайтесь извлечь это из файла.
3) После 4-5 пробега мы видим, что происходит это исключение