Пружинная интеграция - Queue / Poller, кажется, исчерпывает пул потоков без каких-либо действий

У меня есть приложение интеграции Spring, прикрепленное к брокеру AMQP.

Я хочу получать сообщения из очереди amqp и обновлять записи в БД.

Чтобы повысить производительность, у меня есть пул работников, позволяющих выполнять несколько обновлений одновременно.

У меня есть следующая конфигурация:




    



    



Если я начну этот запуск, без входящих сообщений для обработки на канале AMQP, я быстро увижу, что thredpool исчерпан, и начну отклонять.

Вот'логи:

[Thu Apr 2013 23:41:51.153] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w4qPp60jVEQOIEovR4cERv], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.160] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-Q3Lq4R9g9E8WBNVLYzaFmq], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.166] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w8bg7ltEV2mot8QXDPCmfK], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,3), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.170] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-A-0KdqhFjpc-Hvjmv7aZAc], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.199] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.200] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.220] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'

Довольно быстро пул потоков начинает отклонять выполнение:

[Thu Apr 2013 23:47:15.363] ERROR [] (org.springframework.integration.handler.LoggingHandler:126) - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6ff3cb0e] did not accept task: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@78615c8b
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:244)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:680)
Caused by: java.util.concurrent.RejectedExecutionException
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
    at org.springframework.sched

uling.concurrent.ThreadPoolTaskExecutor.execute (ThreadPoolTaskExecutor.java:241) ... еще 12

Я подозреваю, что виновник лежит здесь:BlockingQueueConsumer - указание, что каждый опрос для сообщения блокирует поток до тех пор, пока не прибудет сообщение ... что приводит к быстрому исчерпанию пула потоков.

Какие'правильный способ настроить это?

Ответы на вопрос(2)

Ваш ответ на вопрос