Как работает пакетная агрегация Camel 2.11 с отдельным маршрутом?
Сначала есть похожий вопрос без ответаОбъединение маршрутов в единый агрегатор
У нас есть несколько пользовательских маршрутов (ftp, file, smb) для чтения файлов из удаленных систем. Упрощено для тестирования с прямым маршрутом, но аналогичное поведение с пакетными потребителями:
from("direct:"+routeId).id(routeId)
.setProperty(AGGREGATION_PROPERTY, constant(routeId))
.log(String.format("Sending (${body}) to %s", "direct:start1"))
.to("direct:aggregate");
После преобразования все результаты одного опроса агрегируются партиями по отдельному маршруту:
from("direct:aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
.completionFromBatchConsumer()
.to("log:result", "mock:result");
Все работает нормально, если каждый потребитель работает отдельно. Но если несколько потребителей работают параллельно, агрегация разделит опросы. Пример, если файл-потребитель опрашивает 500 сообщений, и второй маршрут начинает читать 6 файлов из ftp, возможно, мы получим 2 агрегата: 1 с 500 сообщениями из файла и 1 с 6 сообщениями из ftp.
Прецедент:
public void testAggregateByProperty() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);
assertMockEndpointsSatisfied();
}
Результат: «A + A», «B», «A», «B», «A» и не ожидаемые «A + A + A», «B + B», «A», «Z» , Вопросы:
Правильно ли наше предположение об агрегации?Как мы можем достичь ожидаемого поведения?Если мы установим завершение времени, оно показывает, что тайм-аут произойдет с первого обмена - независимо от того, есть ли новые обмены?