Ein und dieselbe Nachricht mehrmals von Kafka lesen

Ich benutzeSpring Kafka API um Kafka Consumer mit manueller Offsetverwaltung zu implementieren:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

Hier möchte ich, dass der Verbraucher den Offset nur festschreibt, wennsomeCondition hält. Ansonsten sollte der Verbraucher einige Zeit schlafen und @ lesdie gleiche Nachricht nochmal

Kafka Konfiguration:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

Mit der aktuellen Konfiguration, wennsomeCondition == false, Consumer schreibt den Offset nicht fest, liest aber trotzdem die nächsten Nachrichten. Gibt es eine Möglichkeit, den Verbraucher dazu zu bringen, eine Nachricht erneut zu lesen, wenn der Kafkaacknowledgement wurde nicht durchgeführt?

Antworten auf die Frage(4)

Ihre Antwort auf die Frage