Spring @KafkaListener ejecuta y sondea registros después de cierto intervalo

Queríamos consumir los registros después de un cierto intervalo (por ejemplo, cada 5 minutos). Las propiedades de consumo son estándar:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(300000);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    return factory;
}

Aunque cuando cambio la propiedadsetPollTimeout no sondea después del intervalo definido (5 minutos), sondea continuamente después de 30 segundos, aquí están mis registros:

2018-01-23 18:07:26.875 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 2

2018-01-23 18:07:56.901 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 4

Intentábamos construir una aplicación de flujo de kafka con agregaciones en ventanas y planeamos consumir la ventana x después del intervalo y.

Puedo ver eso en la clase:KafkaMessageListenerContainer, setConsumerTaskExecutor Está establecido:

if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }

Pero, ¿cómo configuramos cuando este grupo de subprocesos (frecuencia) sondea registros? Cualquier ayuda apreciada.

Respuestas a la pregunta(2)

Su respuesta a la pregunta