Apache Camel: Antwort für unbekannte Korrelations-ID erhalten

Da ist einmiddleware dazwischen zwei weitere Softwarekomponenten. In demmiddleware Ich richte eine Route einApache ActiveMQ Nachrichten vonApache Camel.

so funktioniert es:

1stComponent Verwendetmiddleware um eine Nachricht an die zu senden3rdComponent

3rdComponent Beantwortet die Nachricht und sendet sie an die1st(mitmiddleware).

           1stComponent <<=>> Middleware <<=>> 3rdComponent

Problem:

Ich benutzeConcurrentConsumers in der Middleware.

Während plötzlich viele Nachrichten nacheinander gesendet werdenmiddleware stoppt den ganzen Prozess! Es gibt keine Ausnahmen oder Meldungen! Beispielsweise wurden die ersten 100 von 500 Nachrichten verarbeitet, und die verbleibenden Nachrichten verbleiben als ausstehende Nachrichten in der Warteschlange.

Diese Warnung wird manchmal während des Vorgangs protokolliert:

[WARN ] TemporaryQueueReplyManager(Camel (camel-1) thread #11 - TemporaryQueueReplyManager[Q.MyQ]):91 - Reply received for unknown correlationID [c551c7aa061f501c]. The message will be ignored: ActiveMQMapMessage {commandId = 2161, responseRequired = true, messageId = ID:xxxxxxx, originalDestination = null, originalTransactionId = null, producerId = ID:xxxxxxx, destination = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, transactionId = null, expiration = 0, timestamp = 1389890272360, arrival = 0, brokerInTime = 1389890272360, brokerOutTime = 1389890272360, correlationId = c551c7aa061f501c, replyTo = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@19e19da, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID:xxxxxxxxxxxxxx, Title=300, CamelJmsDeliveryMode=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} }

Dies ist dasmiddlewares Code:

private static class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("activemq:queue:Q.Middleware?concurrentConsumers=1&maxConcurrentConsumers=10")
        .threads(1, 100)
            .process(new Processor() {
                public void process(Exchange exchange) {
                    //some code
                }
            })
        .inOut("activemq2:queue:Q.3RD")
        ;
    }
}

und das3rdComponent:

private static class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() {
        from("activemq:queue:Q.3RD")
        .threads(1, 100)
        .process(new Processor() {
            public void process(Exchange exchange) {
                //some code
            }
        })
        ;
    }
}

Antworten auf die Frage(4)

Ihre Antwort auf die Frage