Wie funktioniert die Stapelaggregation von Camel 2.11 mit separater Route?

Zunächst gibt es eine ähnliche unbeantwortete Frage Routen zu einem Aggregator verbinden

Wir haben einige Consumer-Routen (ftp, file, smb), die Dateien von entfernten Systemen lesen. Vereinfacht für Tests auf direktem Weg, aber ähnliches Verhalten bei Batch-Verbrauchern:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

Nach der Transformation werden alle Ergebnisse einer Umfrage nach Batch auf einer separaten Route zusammengefasst:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

All funktioniert gut, wenn jeder Consumer getrennt läuft. Wenn jedoch mehrere Verbraucher gleichzeitig aktiv sind, werden die Umfragen durch Aggregation aufgeteilt. Beispiel: Wenn der Dateikonsument 500 Nachrichten abfragt und eine zweite Route beginnt, 6 Dateien von ftp zu lesen, wird erwartet, dass wir 2 Aggregate 1 mit 500 Nachrichten von der Datei und 1 mit 6 Nachrichten von ftp erhalten.

Testfall

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

Das Ergebnis ist: "A + A", "B", "A", "B", "A" und nicht das erwartete "A + A + A", "B + B", "A", "Z" ". Fragen

Ist unsere Annahme über die Aggregation falsch?Wie können wir das erwartete Verhalten erreichen?Wenn CompletionTimeout eingestellt ist, tritt eine Zeitüberschreitung beim ersten Austausch auf - unabhängig davon, ob noch neue Austausche vorhanden sind.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage