Apache Camel: Antwort für unbekannte Korrelations-ID erhalten
Da ist einmiddleware
dazwischen zwei weitere Softwarekomponenten. In demmiddleware
Ich richte eine Route einApache ActiveMQ
Nachrichten vonApache Camel
.
so funktioniert es:
1stComponent
Verwendetmiddleware
um eine Nachricht an die zu senden3rdComponent
3rdComponent
Beantwortet die Nachricht und sendet sie an die1st
(mitmiddleware
).
1stComponent <<=>> Middleware <<=>> 3rdComponent
Problem:
Ich benutzeConcurrentConsumers
in der Middleware.
Während plötzlich viele Nachrichten nacheinander gesendet werdenmiddleware
stoppt den ganzen Prozess! Es gibt keine Ausnahmen oder Meldungen! Beispielsweise wurden die ersten 100 von 500 Nachrichten verarbeitet, und die verbleibenden Nachrichten verbleiben als ausstehende Nachrichten in der Warteschlange.
Diese Warnung wird manchmal während des Vorgangs protokolliert:
[WARN ] TemporaryQueueReplyManager(Camel (camel-1) thread #11 - TemporaryQueueReplyManager[Q.MyQ]):91 - Reply received for unknown correlationID [c551c7aa061f501c]. The message will be ignored: ActiveMQMapMessage {commandId = 2161, responseRequired = true, messageId = ID:xxxxxxx, originalDestination = null, originalTransactionId = null, producerId = ID:xxxxxxx, destination = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, transactionId = null, expiration = 0, timestamp = 1389890272360, arrival = 0, brokerInTime = 1389890272360, brokerOutTime = 1389890272360, correlationId = c551c7aa061f501c, replyTo = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@19e19da, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID:xxxxxxxxxxxxxx, Title=300, CamelJmsDeliveryMode=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} }
Dies ist dasmiddlewares
Code:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("activemq:queue:Q.Middleware?concurrentConsumers=1&maxConcurrentConsumers=10")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
.inOut("activemq2:queue:Q.3RD")
;
}
}
und das3rdComponent
:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() {
from("activemq:queue:Q.3RD")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
;
}
}