Lendo a mesma mensagem várias vezes de Kafka
eu usoAPI Spring Kafka implementar o consumidor Kafka com gerenciamento manual de deslocamento:
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
Aqui, quero que o consumidor confirme a compensação apenas sesomeCondition
detém. Caso contrário, o consumidor deve dormir por algum tempo e lera mesma mensagem novamente.
Configuração Kafka:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
Com a configuração atual, sesomeCondition == false
, consumidor não confirma o deslocamento, mas ainda lê as próximas mensagens. Existe uma maneira de fazer o consumidor reler uma mensagem se o Kafkaacknowledgement
não foi realizado?