Spring AMQP v1.4.2 - Problem mit der Wiederverbindung von Kaninchen bei Netzwerkausfall

Ich teste das folgende Szenario in Spring AMQP v1.4.2 und es kann nach einer Netzwerkstörung keine erneute Verbindung hergestellt werden:

Starten Sie die Frühlingsanwendung, die Nachrichten asynchron mit rabbit: listener-container und rabbit: connection-factory verarbeitet (detaillierte Konfiguration folgt). Das Protokoll zeigt, dass die Anwendung erfolgreich Nachrichten empfängt. Machen Sie RabbitMQ für die App unsichtbar, indem Sie eingehenden Netzwerkverkehr auf dem Rabbit-Server löschen:sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROPWarten Sie mindestens 3 Minuten (für Netzwerkverbindungen bis zum Timeout).Fix die Verbindung mit:sudo iptables -D INPUT -p tcp --destination-port 5672 -j DROPWarten Sie einige Zeit (es wurde sogar mehr als eine Stunde lang versucht), und es findet keine erneute Verbindung statt.Starten Sie die Anwendung neu und sie beginnt erneut, Nachrichten zu empfangen, was bedeutet, dass das Netzwerk wieder normal war.

Ich habe dasselbe Szenario auch mit VM-Netzwerkadapter-Verbindungsabbrüchen anstelle von iptables drop getestet, und es passiert dasselbe, d. H. Keine automatische Wiederverbindung. Interessanterweise, wenn ich iptables @ versucABLEHNE, anstelle von DROP, funktioniert es wie erwartet und die App startet neu, sobald ich die Ablehnungsregel entferne, aber ich denke, Ablehnung ist eher ein Serverausfall als ein Netzwerkausfall.

Laut demReferenzdokumen:

Wenn ein MessageListener aufgrund einer Geschäftsausnahme fehlschlägt, wird die Ausnahme vom Nachrichten-Listener-Container behandelt und wartet dann wieder auf eine andere Nachricht. Wenn der Fehler durch eine unterbrochene Verbindung verursacht wird (keine Geschäftsausnahme), muss der Consumer, der Nachrichten für den Listener sammelt, abgebrochen und neu gestartet werden.Der SimpleMessageListenerContainer verarbeitet dies nahtlos und hinterlässt ein Protokoll mit der Meldung, dass der Listener neu gestartet wird. Tatsächlich wird endlos versucht, den Verbraucher neu zu starten, und nur wenn sich der Verbraucher sehr schlecht benimmt, wird er aufgeben. Ein Nebeneffekt ist, dass der Broker, wenn er beim Starten des Containers inaktiv ist, es nur so lange versucht, bis eine Verbindung hergestellt werden kann.

Dies ist das Protokoll, das ich ungefähr eine Minute nach dem Trennen der Verbindung erhalte:

    2015-01-16 14:00:42,433 WARN  [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
    ... 1 common frames omitted

Und ich erhalte diese Protokollmeldung einige Sekunden nach dem erneuten Herstellen der Verbindung:

2015-01-16 14:18:14,551 WARN  [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out

AKTUALISIEREN Seltsamerweise kann ich das Problem nicht mehr reproduzieren, wenn ich die DEBUG-Protokollierung im org.springframework.amqp-Paket aktiviere.

Ohne aktivierte Debug-Protokollierung habe ich versucht, den AMQP-Quellcode zu debuggen. Ich beobachtete, dass kurz nachdem Iptables Drop entfernt wurde,SimpleMessageListenerContainer.doStop()s wird die @ -Methode aufgerufen, die wiederum shutdown () aufruft und alle Kanäle abbricht. Ich habe diese Protokollmeldung auch erhalten, als ich einen Haltepunkt auf doStop () gesetzt habe, der mit der Ursache in Zusammenhang zu stehen scheint:

2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    ... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer

UPDATE 2: Nach dem Einstellen vonrequested-heartbeat bis 30 Sekunden, wie in einer Antwort vorgeschlagen, funktionierte die erneute Verbindung die meiste Zeit und es gelang, die exklusive temporäre Warteschlange, die an einen Fanout-Austausch gebunden ist, neu zu definieren, aber sie kann gelegentlich immer noch nicht wiederhergestellt werden.

In den seltenen Fällen, in denen dies fehlschlug, überwachte ich die RabbitMQ-Verwaltungskonsole während des Tests und stellte fest, dass eine neue Verbindung hergestellt wurde (nachdem die alte Verbindung durch Zeitüberschreitung entfernt wurde), aber die exklusive temporäre Warteschlange nach dem erneuten Herstellen der Verbindung nicht neu definiert wurde. Auch der Client erhielt keine Nachrichten. Es ist jetzt wirklich schwierig, das Problem zuverlässig zu reproduzieren, da es seltener vorkommt. Ich habe die vollständige Konfiguration unten angegeben, die jetzt die Warteschlangendeklarationen enthält.

UPDATE 3: Auch nach dem Ersetzen der exklusiven temporären Warteschlange durch eine benannte Warteschlange mit automatischer Löschung tritt gelegentlich dasselbe Verhalten auf. d.h. die benannte Warteschlange zum automatischen Löschen wird nach dem erneuten Herstellen der Verbindung nicht neu definiert, und es werden keine Nachrichten empfangen, bis die Anwendung neu gestartet wird.

Ich würde mich sehr freuen, wenn mir jemand dabei helfen kann.

Hier ist die AMQP-Konfiguration für den Frühling, auf die ich mich verlasse:

<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>

<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
    <rabbit:bindings>
        <rabbit:binding queue="control-queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="none"
                           concurrency="1"
                           prefetch="1">
    <rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>

</rabbit:listener-container>

<rabbit:connection-factory id="connection-factory"
                           username="${rabbit.username}"
                           password="${rabbit.password}"
                           host="${rabbit.host}"
                           virtual-host="${rabbit.virtualhost}"
                           publisher-confirms="true" 
                           channel-cache-size="100"
                           requested-heartbeat="30" />

<rabbit:admin id="admin" connection-factory="connection-factory"/>

<rabbit:queue id="qu0-id" name="qu0">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="dead-letter"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
    <rabbit:bindings>
        <rabbit:binding queue="qu0" pattern="p.0"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="manual"
                           concurrency="4"
                           prefetch="30">
    <rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>

Antworten auf die Frage(6)

Ihre Antwort auf die Frage