Como a agregação em lote do Camel 2.11 funciona com rota separada?

Primeiro, há uma pergunta sem resposta semelhanteUnindo rotas ao agregador único

Temos algumas rotas de consumidor (ftp, arquivo, smb) lendo arquivos de sistemas remotos. Simplificado para teste com rota direta, mas comportamento semelhante com consumidores em lote:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

Após a transformação, todos os resultados de uma pesquisa são agregados por lote em uma rota separada:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

Tudo funciona bem, se todos os consumidores executam separados. Mas se vários consumidores rodarem em paralelo, a agregação dividirá as pesquisas. Exemplo: se o consumidor de arquivo pesquisa 500 mensagens e uma segunda rota começa a ler 6 arquivos do ftp, a expectativa é obter 2 agregados 1 com 500 mensagens do arquivo e 1 com 6 mensagens do ftp.

Caso de teste:

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

O resultado é: "A + A", "B", "A", "B", "A" e não o esperado "A + A + A", "B + B", "A", "Z" . Questões:

Nossa suposição sobre agregação está errada?Como podemos alcançar o comportamento esperado?Se definirmos o completeTimeout, parece que o tempo limite ocorrerá na primeira troca - independente se ainda houver novas trocas?

questionAnswers(1)

yourAnswerToTheQuestion