Это прекрасно работает большую часть времени. Тем не менее, в семантике точно один раз; это не будет работать.

ичок в Kafka и работаю над прототипом для подключения проприетарного потокового сервиса к Kafka.

Я хочу получить ключ от последнего сообщения, отправленного по теме, так как наш внутренний поточный потребитель должен войти в систему с идентификатором последнего сообщения, которое он получил при подключении.

Возможно ли это сделать с помощью KafkaProducer или KafkaConsumer?

Я пытался сделать следующее с помощью Consumer, но при запуске консольного потребителя я вижу, что сообщения воспроизводятся.

    // Poll so we know we're connected
    consumer.poll(100);
    // Get the assigned partitions
    Set<TopicPartition> assignedPartitions = consumer.assignment();
    // Seek to the end of those partitions
    consumer.seekToEnd(assignedPartitions);

    for(TopicPartition partition : assignedPartitions) {
        final long offset = consumer.committed(partition).offset();
        // Seek to the previous message
        consumer.seek(partition,offset - 1);
    }

    // Now get the last message
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        lastKey = record.key();
    }
    consumer.close();

Это ожидаемое поведение или я на неверном пути?

Ответы на вопрос(2)

Ваш ответ на вопрос