Ein und dieselbe Nachricht mehrmals von Kafka lesen
Ich benutzeSpring Kafka API um Kafka Consumer mit manueller Offsetverwaltung zu implementieren:
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
Hier möchte ich, dass der Verbraucher den Offset nur festschreibt, wennsomeCondition
hält. Ansonsten sollte der Verbraucher einige Zeit schlafen und @ lesdie gleiche Nachricht nochmal
Kafka Konfiguration:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
Mit der aktuellen Konfiguration, wennsomeCondition == false
, Consumer schreibt den Offset nicht fest, liest aber trotzdem die nächsten Nachrichten. Gibt es eine Möglichkeit, den Verbraucher dazu zu bringen, eine Nachricht erneut zu lesen, wenn der Kafkaacknowledgement
wurde nicht durchgeführt?