Это прекрасно работает большую часть времени. Тем не менее, в семантике точно один раз; это не будет работать.
ичок в Kafka и работаю над прототипом для подключения проприетарного потокового сервиса к Kafka.
Я хочу получить ключ от последнего сообщения, отправленного по теме, так как наш внутренний поточный потребитель должен войти в систему с идентификатором последнего сообщения, которое он получил при подключении.
Возможно ли это сделать с помощью KafkaProducer или KafkaConsumer?
Я пытался сделать следующее с помощью Consumer, но при запуске консольного потребителя я вижу, что сообщения воспроизводятся.
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for(TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition,offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
lastKey = record.key();
}
consumer.close();
Это ожидаемое поведение или я на неверном пути?