Wie funktioniert die Stapelaggregation von Camel 2.11 mit separater Route?
Zunächst gibt es eine ähnliche unbeantwortete Frage Routen zu einem Aggregator verbinden
Wir haben einige Consumer-Routen (ftp, file, smb), die Dateien von entfernten Systemen lesen. Vereinfacht für Tests auf direktem Weg, aber ähnliches Verhalten bei Batch-Verbrauchern:
from("direct:"+routeId).id(routeId)
.setProperty(AGGREGATION_PROPERTY, constant(routeId))
.log(String.format("Sending (${body}) to %s", "direct:start1"))
.to("direct:aggregate");
Nach der Transformation werden alle Ergebnisse einer Umfrage nach Batch auf einer separaten Route zusammengefasst:
from("direct:aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
.completionFromBatchConsumer()
.to("log:result", "mock:result");
All funktioniert gut, wenn jeder Consumer getrennt läuft. Wenn jedoch mehrere Verbraucher gleichzeitig aktiv sind, werden die Umfragen durch Aggregation aufgeteilt. Beispiel: Wenn der Dateikonsument 500 Nachrichten abfragt und eine zweite Route beginnt, 6 Dateien von ftp zu lesen, wird erwartet, dass wir 2 Aggregate 1 mit 500 Nachrichten von der Datei und 1 mit 6 Nachrichten von ftp erhalten.
Testfall
public void testAggregateByProperty() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);
assertMockEndpointsSatisfied();
}
Das Ergebnis ist: "A + A", "B", "A", "B", "A" und nicht das erwartete "A + A + A", "B + B", "A", "Z" ". Fragen
Ist unsere Annahme über die Aggregation falsch?Wie können wir das erwartete Verhalten erreichen?Wenn CompletionTimeout eingestellt ist, tritt eine Zeitüberschreitung beim ersten Austausch auf - unabhängig davon, ob noch neue Austausche vorhanden sind.