¿Cómo funciona la agregación por lotes Camel 2.11 con una ruta separada?

Primero hay una pregunta similar sin respuestaUnir rutas en un solo agregador

Tenemos algunas rutas de consumo (ftp, file, smb) que leen archivos de sistemas remotos. Simplificado para prueba con ruta directa, pero comportamiento similar con consumidores por lotes:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

Después de la transformación, todos los resultados de una encuesta se agregan por lote en una ruta separada:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

Todo funciona bien, si cada consumidor se ejecuta por separado. Pero si varios consumidores se ejecutan en paralelo, la agregación dividirá las encuestas. Ejemplo: si el consumidor de archivos sondea 500 mensajes y una segunda ruta comienza a leer 6 archivos de ftp, las expectativas son que obtenemos 2 agregados 1 con 500 mensajes de archivo y 1 con 6 mensajes de ftp.

Caso de prueba:

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

El resultado es: "A + A", "B", "A", "B", "A" y no el esperado "A + A + A", "B + B", "A", "Z" . Preguntas:

¿Es errónea nuestra suposición sobre la agregación?¿Cómo podemos lograr el comportamiento esperado?Si configuramos completeTimeout, parece que el tiempo de espera se producirá desde el primer intercambio, ¿independiente si todavía hay nuevos intercambios?

Respuestas a la pregunta(1)

Su respuesta a la pregunta