Если вы хотите контролировать скорость, с которой потребитель Kafka использует Spring @KafkaListener, пожалуйста, автоматически подключите использование компонента KafkaListenerEndpointRegistry следующим образом и получите доступ к необходимому MessageListenerContainer. после этого вы можете использовать функции pause () и resume () для управления требуемым поведением.

тели использовать записи через определенный промежуток времени (например, каждые 5 минут). Потребительские свойства являются стандартными:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(300000);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    return factory;
}

Хотя когда я меняю собственностьsetPollTimeout он не опрашивает после определенного интервала (5 минут), он непрерывно опрашивается через 30 секунд, вот мои журналы:

2018-01-23 18:07:26.875 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 2

2018-01-23 18:07:56.901 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 4

Мы пытались создать потоковое приложение kafka с оконными агрегатами и планировали использовать окно x после интервала y.

Я вижу это в классе:KafkaMessageListenerContainer, setConsumerTaskExecutor установлено:

if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }

Но как мы настроим, когда этот (частотный) пул потоков опрашивает записи. Любая помощь приветствуется.

Ответы на вопрос(2)

Ваш ответ на вопрос