Как работает пакетная агрегация Camel 2.11 с отдельным маршрутом?

Сначала есть похожий вопрос без ответаОбъединение маршрутов в единый агрегатор

У нас есть несколько пользовательских маршрутов (ftp, file, smb) для чтения файлов из удаленных систем. Упрощено для тестирования с прямым маршрутом, но аналогичное поведение с пакетными потребителями:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

После преобразования все результаты одного опроса агрегируются партиями по отдельному маршруту:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

Все работает нормально, если каждый потребитель работает отдельно. Но если несколько потребителей работают параллельно, агрегация разделит опросы. Пример, если файл-потребитель опрашивает 500 сообщений, и второй маршрут начинает читать 6 файлов из ftp, возможно, мы получим 2 агрегата: 1 с 500 сообщениями из файла и 1 с 6 сообщениями из ftp.

Прецедент:

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

Результат: «A + A», «B», «A», «B», «A» и не ожидаемые «A + A + A», «B + B», «A», «Z» , Вопросы:

Правильно ли наше предположение об агрегации?Как мы можем достичь ожидаемого поведения?Если мы установим завершение времени, оно показывает, что тайм-аут произойдет с первого обмена - независимо от того, есть ли новые обмены?

Ответы на вопрос(1)

Ваш ответ на вопрос