Если вы хотите контролировать скорость, с которой потребитель Kafka использует Spring @KafkaListener, пожалуйста, автоматически подключите использование компонента KafkaListenerEndpointRegistry следующим образом и получите доступ к необходимому MessageListenerContainer. после этого вы можете использовать функции pause () и resume () для управления требуемым поведением.
тели использовать записи через определенный промежуток времени (например, каждые 5 минут). Потребительские свойства являются стандартными:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(300000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
return factory;
}
Хотя когда я меняю собственностьsetPollTimeout
он не опрашивает после определенного интервала (5 минут), он непрерывно опрашивается через 30 секунд, вот мои журналы:
2018-01-23 18:07:26.875 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 2
2018-01-23 18:07:56.901 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 4
Мы пытались создать потоковое приложение kafka с оконными агрегатами и планировали использовать окно x после интервала y.
Я вижу это в классе:KafkaMessageListenerContainer
, setConsumerTaskExecutor
установлено:
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
Но как мы настроим, когда этот (частотный) пул потоков опрашивает записи. Любая помощь приветствуется.