Это прекрасно работает большую часть времени. Тем не менее, в семантике точно один раз; это не будет работать.

ичок в Kafka и работаю над прототипом для подключения проприетарного потокового сервиса к Kafka.

Я хочу получить ключ от последнего сообщения, отправленного по теме, так как наш внутренний поточный потребитель должен войти в систему с идентификатором последнего сообщения, которое он получил при подключении.

Возможно ли это сделать с помощью KafkaProducer или KafkaConsumer?

Я пытался сделать следующее с помощью Consumer, но при запуске консольного потребителя я вижу, что сообщения воспроизводятся.

    // Poll so we know we're connected
    consumer.poll(100);
    // Get the assigned partitions
    Set<TopicPartition> assignedPartitions = consumer.assignment();
    // Seek to the end of those partitions
    consumer.seekToEnd(assignedPartitions);

    for(TopicPartition partition : assignedPartitions) {
        final long offset = consumer.committed(partition).offset();
        // Seek to the previous message
        consumer.seek(partition,offset - 1);
    }

    // Now get the last message
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        lastKey = record.key();
    }
    consumer.close();

Это ожидаемое поведение или я на неверном пути?

Ответы на вопрос(2)

    // Poll so we know we're connected
    consumer.poll(100);
    // Get the assigned partitions
    Set<TopicPartition> assignedPartitions = consumer.assignment();
    // Seek to the end of those partitions
    consumer.seekToEnd(assignedPartitions);

    for (TopicPartition partition : assignedPartitions) {
        final long offset = consumer.committed(partition).offset();
        // Seek to the previous message
        consumer.seek(partition, offset - 1);
    }

    // Now get the last message
    ConsumerRecords<String, String> records = consumer.poll(100);
    int size = records.count();
    int index = 0;
    for (ConsumerRecord<String, String> record : records) {
        index = index + 1;
        if (index == size) {
            String value = record.value();
            System.out.println("Last Message = " + value);
        }
    }
    consumer.close();
 JR ibkr05 февр. 2019 г., 22:48
Это прекрасно работает большую часть времени. Тем не менее, в семантике точно один раз; это не будет работать.
 praveen kumar03 янв. 2019 г., 11:53
получение nullPointerException при окончательном длинном смещении = kafkaConsumer.committed (partition) .offset ();
Решение Вопроса

final long offset = consumer.committed(partition).offset(), какссылка API относитсяcommitted метод должен получитьthe last committed offset для данного раздела, т.е. последнее смещение, которое ваш потребитель сообщает серверу kafka, что он уже прочитал. Так что, безусловно, вы получитеmessages replayedпотому что вы всегда читаете с определенного смещения. Как я думаю, я должен удалить только первый блок.

 Sumeet Gupta05 дек. 2017 г., 14:17
@ryanpudd, я пытаюсь добиться того же в программе spark, но не могу. Можете ли вы предоставить ссылку на ваш полный код? Спасибо
 ryanpudd08 нояб. 2017 г., 18:55
Конечно, я должен был проверить Consumer.position вместо

Ваш ответ на вопрос