Spring AMQP v1.4.2 - Problema de reconexión de conejo en falla de red

Estoy probando el siguiente escenario en Spring AMQP v1.4.2 y no se puede volver a conectar después de una interrupción de la red:

Inicie la aplicación Spring que consume mensajes de forma asíncrona usando rabbit: listener-container y rabbit: connection-factory (a continuación se detalla la configuración).El registro muestra que la aplicación está recibiendo mensajes con éxito.Haga que RabbitMQ sea invisible para la aplicación colocando el tráfico de red entrante en el servidor de conejo:sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROPEspere al menos 3 minutos (para que se agote el tiempo de espera de las conexiones de red).Arregle la conexión con:sudo iptables -D INPUT -p tcp --destination-port 5672 -j DROPEspere un tiempo (incluso lo intentó durante más de una hora) y no se produce la reconexión.Reinicie la aplicación y comienza a recibir mensajes nuevamente, lo que significa que la red volvió a la normalidad.

También probé el mismo escenario con desconexiones de adaptador de red VM en lugar de caída de iptables y sucede lo mismo, es decir, no hay reconexión automática. Curiosamente, cuando pruebo iptablesRECHAZAR, en lugar de DROP, funciona como se esperaba y la aplicación se reinicia tan pronto como elimino la regla de rechazo, pero creo que rechazar es más como una falla del servidor que una falla de la red.

De acuerdo con ladocumento de referencia:

Si un MessageListener falla debido a una excepción comercial, el contenedor de escucha de mensajes maneja la excepción y luego vuelve a escuchar otro mensaje. Si la falla es causada por una conexión caída (no es una excepción comercial), entonces el consumidor que está recolectando mensajes para el oyente debe ser cancelado y reiniciado.SimpleMessageListenerContainer maneja esto sin problemas, y deja un registro para decir que el escucha se está reiniciando. De hecho, se repite sin parar tratando de reiniciar al consumidor, y solo si el consumidor se comporta muy mal, se rendirá. Un efecto secundario es que si el corredor está inactivo cuando se inicia el contenedor, seguirá intentando hasta que se pueda establecer una conexión.

Este es el registro que obtengo aproximadamente un minuto después de la desconexión:

    2015-01-16 14:00:42,433 WARN  [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
    ... 1 common frames omitted

Y recibo este mensaje de registro unos segundos después de la reconexión:

2015-01-16 14:18:14,551 WARN  [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out

ACTUALIZAR: Muy extraño, cuando habilito el inicio de sesión de DEPURACIÓN en el paquete org.springframework.amqp, la reconexión se realiza con éxito y ya no puedo reproducir el problema.

Sin el registro de depuración habilitado, intenté depurar el código AMQP de primavera. Observé que poco después de eliminar la caída de iptables,SimpleMessageListenerContainer.doStop() Se llama al método que activa las llamadas shutdown () y cancela todos los canales. También recibí este mensaje de registro cuando puse un punto de interrupción en doStop () que parece estar relacionado con la causa:

2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    ... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer

ACTUALIZACIÓN 2: Después de configurarrequested-heartbeat a 30 segundos, como se sugiere en una respuesta, la reconexión funcionó la mayoría de las veces y logró redefinir la cola temporal exclusiva, vinculada a un intercambio de fanout, pero aún no se puede reconectar ocasionalmente.

En los raros casos en que falló, monitoreé la consola de administración RabbitMQ durante la prueba y observé que se estableció una nueva conexión (después de que la conexión anterior se eliminó por tiempo de espera) pero la cola temporal exclusiva no se redefinió después de la reconexión. Además, el cliente no estaba recibiendo ningún mensaje. Ahora es realmente difícil reproducir el problema de manera confiable, ya que ocurre con menos frecuencia. He proporcionado la configuración completa a continuación, que ahora contiene las declaraciones de la cola.

ACTUALIZACIÓN 3: Incluso después de reemplazar la cola temporal exclusiva con una cola con nombre de eliminación automática, el mismo comportamiento ocurre ocasionalmente; es decir, la cola con nombre de eliminación automática no se redefine después de la reconexión y no se reciben mensajes hasta que se reinicia la aplicación.

Realmente agradecería mucho si alguien me puede ayudar en esto.

Aquí está la configuración de Spring AMQP en la que estoy confiando:

<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>

<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
    <rabbit:bindings>
        <rabbit:binding queue="control-queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="none"
                           concurrency="1"
                           prefetch="1">
    <rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>

</rabbit:listener-container>

<rabbit:connection-factory id="connection-factory"
                           username="${rabbit.username}"
                           password="${rabbit.password}"
                           host="${rabbit.host}"
                           virtual-host="${rabbit.virtualhost}"
                           publisher-confirms="true" 
                           channel-cache-size="100"
                           requested-heartbeat="30" />

<rabbit:admin id="admin" connection-factory="connection-factory"/>

<rabbit:queue id="qu0-id" name="qu0">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="dead-letter"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
    <rabbit:bindings>
        <rabbit:binding queue="qu0" pattern="p.0"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="manual"
                           concurrency="4"
                           prefetch="30">
    <rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>

Respuestas a la pregunta(3)

Su respuesta a la pregunta