Apache Camel: odpowiedź odebrana dla nieznanego correlationID
Tam jestmiddleware
pomiędzy dwoma innymi komponentami oprogramowania. wmiddleware
Jestem routeremApache ActiveMQ
wiadomości wgApache Camel
.
tak to działa:
1stComponent
używamiddleware
wysłać wiadomość do3rdComponent
3rdComponent
odpowiada na wiadomość i wysyła ją z powrotem do1st
(za pomocąmiddleware
).
1stComponent <<=>> Middleware <<=>> 3rdComponent
Problem:
używamConcurrentConsumers
w middleware.
W środku wysyłania wielu wiadomości, sekwencyjnie, naglemiddleware
zatrzymuje cały proces! nie ma wyjątków ani wiadomości! na przykład pierwsze 100 z 500 wiadomości zostało przetworzonych, a pozostałe pozostały w kolejce jako oczekujące wiadomości.
to ostrzeżenie jest czasami rejestrowane w środku procesu:
[WARN ] TemporaryQueueReplyManager(Camel (camel-1) thread #11 - TemporaryQueueReplyManager[Q.MyQ]):91 - Reply received for unknown correlationID [c551c7aa061f501c]. The message will be ignored: ActiveMQMapMessage {commandId = 2161, responseRequired = true, messageId = ID:xxxxxxx, originalDestination = null, originalTransactionId = null, producerId = ID:xxxxxxx, destination = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, transactionId = null, expiration = 0, timestamp = 1389890272360, arrival = 0, brokerInTime = 1389890272360, brokerOutTime = 1389890272360, correlationId = c551c7aa061f501c, replyTo = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@19e19da, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID:xxxxxxxxxxxxxx, Title=300, CamelJmsDeliveryMode=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} }
to jestmiddlewares
Kod:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("activemq:queue:Q.Middleware?concurrentConsumers=1&maxConcurrentConsumers=10")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
.inOut("activemq2:queue:Q.3RD")
;
}
}
i3rdComponent
:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() {
from("activemq:queue:Q.3RD")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
;
}
}